123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- package server
- import (
- "context"
- "encoding/json"
- "log"
- "time"
- firebase "firebase.google.com/go"
- "firebase.google.com/go/messaging"
- "github.com/rotisserie/eris"
- "go.uber.org/zap"
- "google.golang.org/api/option"
- "sikey.com/websocket/models"
- "sikey.com/websocket/repositories"
- )
- const (
- firebaseServiceAccountID = "firebase-adminsdk-uycdi@sikey-watch.iam.gserviceaccount.com"
- firebaseCredentials = `{
- "type": "service_account",
- "project_id": "sikey-watch",
- "private_key_id": "ab460af1d589188d0a8ab6c597f7daa283317838",
- "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDKjxzFiEJkxMCw\nGhPaZDSFvOhBhYeiWpS1jKNf4lnkkQz/OlKQnK7wFBFGZJ8ZHJQGzKgkR35P9J8L\n7gpEkK3ar4yCtuZL4Vtr0CnvXeO8VJAoUwHkn0oDXTOvTWEWxmt7ROSz9qDNT1jn\n8m18+eJrP9bmqd/junm6RV0qErs0bqZKt2GN500jM67hFzhK7yrGevubr/qsQWS/\nFlpXaaGAvirOJ3tT/fRqVep0K1PUR66IOwsv510GZbvPWg6xG7NWvTOQBM5rvKKu\npHRXhe9N3HCI55wbFBTLsn/bIbxiLCvF4vpd3moFOzohstQKsAxZKcbk4NAxTxyU\nwnk1gNfzAgMBAAECggEABDoDQC3tMLwYRzRsGKEBpY7BC+j04RLhyn4CRLRs6sOK\nEWNOqLzUO6c3iw/7b+M0RYYMtiEVxiZVEUMozLB2KDuEg4+8c3Xi2feQgHGlmF7t\nWSIORo56VWKi+vPy5C5EvOA9o3Gwfkg7Ey/wT+zI6DL/UXXW+aIA+z2KSZ/1bw0c\nf2A4sUaS1OJ5yE9oTdPudO09mBGrBd4G2NigQJ9lgz+A9nh5+ECT/qZJs+Yc/gFT\nq6KhgJgVZydptbNaW9NgW0lKrtJPmxTZ3nO+u2iWpuGJGMb3tNGuR1P35OBK4UBk\nmwgrXK6LDPbuo1RMNVTBFeWf5yCAtekbgtxXXleZWQKBgQDxG3obkRwq02Wd+r/i\nbkScwGeEsnXbjIzeQsalb4FDOQX69J7GFnjAcG+23yjoiRPmA0gnGzXEsABocFoR\nJHgJGwiXN+sjFKbHOKKbLK1hGHSUZfCW+7KGx2nvUA8m4qXG3HP/XNFZQQaJrnb2\nLIE+w5GkAltR1uFVK3YCf19F2wKBgQDXEhaaaqG9+pHxTXRUmeJhruVooCs4w5eP\n9o9WUmX0w6UNgLe1AqLUFe0/zxOphD1MOUMs3FReWox6AdrRzdXlE/2gKR0NdGQG\nFiNSuhb58xnM6iMECg+im39HqCXRZRQKOeEZu7XJEPZU2+RvlM5ov00a+aif4Pfb\nEtarj9qtyQKBgQCazYeC95JcNMqDuiFFYoMPGcHdQy/EvOMdOUaNpaAa5xvd2v2u\nNXNmK0qu4W1Ej+6EugqzgRbuqAo7BBfv9bhUMFU9sht4tKO4Oba9ZtwTAT6ooSLt\n7cDcJGDx1DdGQjMqERUxGgkYA1YNREUBHeYFxE9YPGMhkpOuuW7Vf65ODQKBgCYb\nUl3x3s6mgw1aR+5lhbMBJiyvlHjuTwB3E1acKux/bdNCp0ovOWKSsALKUhWLFMFY\ntApSz7AYIyPLCFZ8PhXkwN+L6VXk9YQOkBusVT3cUQn5wlKI5dRN8PNlW64KVs6p\nCrVgiQkjNEI84/DRUPFGVAcfjT0mw0PRxq+HQZvhAoGBAIeRKHj/dceKR+pDw1vN\nTgSK61MfCAFMPSANLGYq+1lRoyDlc9ekFAZYaGXsSPy9ja3VSJkH/eu0rhUS6uYG\na7sVk/TwXoze/scHiR0JZNiKxoyoFaHJmblFdk0TC4+vwcpJFRSoQhh6gz0tZUL5\nAatOg4mA1vMm3Ll+xUY4ZzFe\n-----END PRIVATE KEY-----\n",
- "client_email": "firebase-adminsdk-uycdi@sikey-watch.iam.gserviceaccount.com",
- "client_id": "109500502903490303964",
- "auth_uri": "https://accounts.google.com/o/oauth2/auth",
- "token_uri": "https://oauth2.googleapis.com/token",
- "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
- "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-uycdi%40sikey-watch.iam.gserviceaccount.com",
- "universe_domain": "googleapis.com"
- }`
- )
- const DefaultQueueStatus = -1
- const DefaultRemainingRetries = 3
- type FirebaseMessageServer struct {
- app *firebase.App
- repos *repositories.Repositories
- }
- func NewFirebaseMessageServer(repos *repositories.Repositories) *FirebaseMessageServer {
- ctx := context.Background()
- conf := &firebase.Config{ServiceAccountID: firebaseServiceAccountID}
- app, err := firebase.NewApp(ctx, conf,
- // option.WithHTTPClient(&http.Client{Timeout: 5 * time.Second}),
- option.WithCredentialsJSON([]byte(firebaseCredentials)))
- if err != nil {
- log.Fatalln(err)
- }
- firebaseMessageServer := &FirebaseMessageServer{app: app, repos: repos}
- go firebaseMessageServer.queueRun(ctx)
- return firebaseMessageServer
- }
- func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Message) error {
- // "newMsg": { "title": "News", "body": "click to view" },
- // "chatting": { "title": "Chat Message", "body": "click to view" },
- // "location": { "title": "Location Changed", "body": "click to view" },
- // "fence": { "title": "Electronic Fence", "body": "click to view" },
- // "notification": { "title": "New Notification","body": "click to view" },
- // "videoCall": { "title": "Call Notification", "body": "click to view" }
- var title string
- var body = "click to view"
- messageType := message.MessageType()
- switch messageType {
- case MessageTypeDownChating, MessageTypeUpChating:
- title = "Chat Message"
- case MessageTypeLocation:
- title = "Location Changed"
- case MessageTypeNotification:
- // var notification Notification
- // _ = json.Unmarshal(message.Data(), ¬ification)
- // switch notification.Content.ID {
- // }
- title = "New Notification"
- case MessageTypeVideoCall:
- title = "Call Notification"
- default:
- title = "News"
- }
- // 加入消息到 fcm 队列
- queue := &models.FirebaseMessagingQueue{
- Title: title,
- Body: body,
- Receiver: uid,
- Data: string(serializeMessage(message)),
- RemainingRetries: DefaultRemainingRetries,
- }
- if err := f.repos.FirebaseMessageQueueRepository.Create(ctx, queue); err != nil {
- zap.L().Info("[firebase] 加入队列错误", zap.Error(err), zap.String("user_id", uid))
- return eris.Wrap(err, "uanble to add fcm queue")
- }
- zap.L().Info("[firebase] 已加入推送队列", zap.String("user_id", uid))
- return nil
- }
- func (f *FirebaseMessageServer) queueRun(ctx context.Context) {
- ticker := time.NewTicker(3 * time.Second)
- defer ticker.Stop()
- // firebase messaging client
- client, err := f.app.Messaging(ctx)
- if err != nil {
- panic(eris.Wrap(err, "unable to create fcm client connect"))
- }
- zap.L().Info("[firebase] firebase queue running")
- for {
- select {
- case <-ticker.C:
- // 每秒运行fcm队列
- queues, err := f.repos.FirebaseMessageQueueRepository.FindRetrieableQueue(ctx)
- if err != nil {
- zap.L().Error("[firebase] ticker error", zap.Error(err))
- continue
- }
- for _, queue := range queues {
- var remainingRetries int = queue.RemainingRetries
- remainingRetries = remainingRetries - 1
- if remainingRetries > 0 {
- queue.LastRetryTime = Now(time.Now().UTC())
- queue.RemainingRetries = remainingRetries
- // 使用一次, 如果次数用完删除
- if err := f.repos.FirebaseMessageQueueRepository.Save(ctx, &queue); err != nil {
- zap.L().Error("[firebase] 更新队列信息错误", zap.Any("queue", queue), zap.Error(err))
- continue
- }
- } else {
- if err := f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID); err != nil {
- zap.L().Error("[firebase] 删除队列错误", zap.Any("queue", queue), zap.Error(err))
- continue
- }
- }
- timeoutCtx, cannel := context.WithCancel(ctx)
- go func(ctx context.Context, queue models.FirebaseMessagingQueue) {
- defer cannel()
- receiver := queue.Receiver
- firebaseToken, err := f.repos.FirebaseMessageRepository.GetFirebaseToken(ctx, receiver)
- if err != nil {
- if eris.Is(err, models.ErrRecordNotFound) {
- return
- }
- zap.L().Error("unable to find firebase message database",
- zap.Error(err),
- zap.String("user_id", receiver))
- return
- }
- aps := map[string]interface{}{
- "alert": map[string]interface{}{
- "title": queue.Title,
- "body": queue.Body,
- },
- }
- apsBuf, _ := json.Marshal(aps)
- msg := &messaging.Message{
- Token: firebaseToken.Token,
- Data: map[string]string{
- "aps": string(apsBuf),
- "message": queue.Data,
- },
- // Notification: &messaging.Notification{
- // Title: queue.Title,
- // Body: queue.Body,
- // },
- }
- zap.L().Info("[firebase] Send", zap.Any("msg", msg), zap.String("user_id", receiver))
- var resultName string
- if resultName, err = client.Send(ctx, msg); err != nil {
- zap.L().Error("[firebase] 发送错误", zap.Error(err), zap.String("user_id", receiver))
- return
- }
- zap.L().Info("[firebase] Ack", zap.String("user_id", receiver))
- client.Send(ctx, &messaging.Message{})
- zap.L().Info("[firebase] Successfully",
- zap.Any("msg", msg),
- zap.String("user_id", receiver),
- zap.String("result", resultName))
- }(timeoutCtx, queue)
- select {
- case <-timeoutCtx.Done():
- _ = f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID)
- case <-time.After(time.Duration(4 * time.Second)):
- zap.L().Error("[firebase] timeout", zap.Any("queue", queue))
- }
- }
- }
- }
- }
- func Now(n time.Time) *time.Time {
- return &n
- }
|