fcm.go 8.1 KB


  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log"
  6. "time"
  7. firebase "firebase.google.com/go"
  8. "firebase.google.com/go/messaging"
  9. "github.com/rotisserie/eris"
  10. "go.uber.org/zap"
  11. "google.golang.org/api/option"
  12. "sikey.com/websocket/models"
  13. "sikey.com/websocket/repositories"
  14. )
  15. const (
  16. firebaseServiceAccountID = "firebase-adminsdk-uycdi@sikey-watch.iam.gserviceaccount.com"
  17. firebaseCredentials = `{
  18. "type": "service_account",
  19. "project_id": "sikey-watch",
  20. "private_key_id": "ab460af1d589188d0a8ab6c597f7daa283317838",
  21. "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDKjxzFiEJkxMCw\nGhPaZDSFvOhBhYeiWpS1jKNf4lnkkQz/OlKQnK7wFBFGZJ8ZHJQGzKgkR35P9J8L\n7gpEkK3ar4yCtuZL4Vtr0CnvXeO8VJAoUwHkn0oDXTOvTWEWxmt7ROSz9qDNT1jn\n8m18+eJrP9bmqd/junm6RV0qErs0bqZKt2GN500jM67hFzhK7yrGevubr/qsQWS/\nFlpXaaGAvirOJ3tT/fRqVep0K1PUR66IOwsv510GZbvPWg6xG7NWvTOQBM5rvKKu\npHRXhe9N3HCI55wbFBTLsn/bIbxiLCvF4vpd3moFOzohstQKsAxZKcbk4NAxTxyU\nwnk1gNfzAgMBAAECggEABDoDQC3tMLwYRzRsGKEBpY7BC+j04RLhyn4CRLRs6sOK\nEWNOqLzUO6c3iw/7b+M0RYYMtiEVxiZVEUMozLB2KDuEg4+8c3Xi2feQgHGlmF7t\nWSIORo56VWKi+vPy5C5EvOA9o3Gwfkg7Ey/wT+zI6DL/UXXW+aIA+z2KSZ/1bw0c\nf2A4sUaS1OJ5yE9oTdPudO09mBGrBd4G2NigQJ9lgz+A9nh5+ECT/qZJs+Yc/gFT\nq6KhgJgVZydptbNaW9NgW0lKrtJPmxTZ3nO+u2iWpuGJGMb3tNGuR1P35OBK4UBk\nmwgrXK6LDPbuo1RMNVTBFeWf5yCAtekbgtxXXleZWQKBgQDxG3obkRwq02Wd+r/i\nbkScwGeEsnXbjIzeQsalb4FDOQX69J7GFnjAcG+23yjoiRPmA0gnGzXEsABocFoR\nJHgJGwiXN+sjFKbHOKKbLK1hGHSUZfCW+7KGx2nvUA8m4qXG3HP/XNFZQQaJrnb2\nLIE+w5GkAltR1uFVK3YCf19F2wKBgQDXEhaaaqG9+pHxTXRUmeJhruVooCs4w5eP\n9o9WUmX0w6UNgLe1AqLUFe0/zxOphD1MOUMs3FReWox6AdrRzdXlE/2gKR0NdGQG\nFiNSuhb58xnM6iMECg+im39HqCXRZRQKOeEZu7XJEPZU2+RvlM5ov00a+aif4Pfb\nEtarj9qtyQKBgQCazYeC95JcNMqDuiFFYoMPGcHdQy/EvOMdOUaNpaAa5xvd2v2u\nNXNmK0qu4W1Ej+6EugqzgRbuqAo7BBfv9bhUMFU9sht4tKO4Oba9ZtwTAT6ooSLt\n7cDcJGDx1DdGQjMqERUxGgkYA1YNREUBHeYFxE9YPGMhkpOuuW7Vf65ODQKBgCYb\nUl3x3s6mgw1aR+5lhbMBJiyvlHjuTwB3E1acKux/bdNCp0ovOWKSsALKUhWLFMFY\ntApSz7AYIyPLCFZ8PhXkwN+L6VXk9YQOkBusVT3cUQn5wlKI5dRN8PNlW64KVs6p\nCrVgiQkjNEI84/DRUPFGVAcfjT0mw0PRxq+HQZvhAoGBAIeRKHj/dceKR+pDw1vN\nTgSK61MfCAFMPSANLGYq+1lRoyDlc9ekFAZYaGXsSPy9ja3VSJkH/eu0rhUS6uYG\na7sVk/TwXoze/scHiR0JZNiKxoyoFaHJmblFdk0TC4+vwcpJFRSoQhh6gz0tZUL5\nAatOg4mA1vMm3Ll+xUY4ZzFe\n-----END PRIVATE KEY-----\n",
  22. "client_email": "firebase-adminsdk-uycdi@sikey-watch.iam.gserviceaccount.com",
  23. "client_id": "109500502903490303964",
  24. "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  25. "token_uri": "https://oauth2.googleapis.com/token",
  26. "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  27. "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-uycdi%40sikey-watch.iam.gserviceaccount.com",
  28. "universe_domain": "googleapis.com"
  29. }`
  30. )
  31. const DefaultQueueStatus = -1
  32. const DefaultRemainingRetries = 3
  33. type FirebaseMessageServer struct {
  34. app *firebase.App
  35. repos *repositories.Repositories
  36. }
  37. func NewFirebaseMessageServer(repos *repositories.Repositories) *FirebaseMessageServer {
  38. ctx := context.Background()
  39. conf := &firebase.Config{ServiceAccountID: firebaseServiceAccountID}
  40. app, err := firebase.NewApp(ctx, conf,
  41. // option.WithHTTPClient(&http.Client{Timeout: 5 * time.Second}),
  42. option.WithCredentialsJSON([]byte(firebaseCredentials)))
  43. if err != nil {
  44. log.Fatalln(err)
  45. }
  46. firebaseMessageServer := &FirebaseMessageServer{app: app, repos: repos}
  47. go firebaseMessageServer.queueRun(ctx)
  48. return firebaseMessageServer
  49. }
  50. func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Message) error {
  51. // "newMsg": { "title": "News", "body": "click to view" },
  52. // "chatting": { "title": "Chat Message", "body": "click to view" },
  53. // "location": { "title": "Location Changed", "body": "click to view" },
  54. // "fence": { "title": "Electronic Fence", "body": "click to view" },
  55. // "notification": { "title": "New Notification","body": "click to view" },
  56. // "videoCall": { "title": "Call Notification", "body": "click to view" }
  57. var title string
  58. var body = "click to view"
  59. messageType := message.MessageType()
  60. switch messageType {
  61. case MessageTypeDownChating, MessageTypeUpChating:
  62. title = "Chat Message"
  63. case MessageTypeLocation:
  64. title = "Location Changed"
  65. case MessageTypeNotification:
  66. // var notification Notification
  67. // _ = json.Unmarshal(message.Data(), &notification)
  68. // switch notification.Content.ID {
  69. // }
  70. title = "New Notification"
  71. case MessageTypeVideoCall:
  72. title = "Call Notification"
  73. default:
  74. title = "News"
  75. }
  76. // 加入消息到 fcm 队列
  77. queue := &models.FirebaseMessagingQueue{
  78. Title: title,
  79. Body: body,
  80. Receiver: uid,
  81. Data: string(serializeMessage(message)),
  82. RemainingRetries: DefaultRemainingRetries,
  83. }
  84. if err := f.repos.FirebaseMessageQueueRepository.Create(ctx, queue); err != nil {
  85. zap.L().Info("[firebase] 加入队列错误", zap.Error(err), zap.String("user_id", uid))
  86. return eris.Wrap(err, "uanble to add fcm queue")
  87. }
  88. zap.L().Info("[firebase] 已加入推送队列", zap.String("user_id", uid))
  89. return nil
  90. }
  91. func (f *FirebaseMessageServer) queueRun(ctx context.Context) {
  92. ticker := time.NewTicker(3 * time.Second)
  93. defer ticker.Stop()
  94. // firebase messaging client
  95. client, err := f.app.Messaging(ctx)
  96. if err != nil {
  97. panic(eris.Wrap(err, "unable to create fcm client connect"))
  98. }
  99. zap.L().Info("[firebase] firebase queue running")
  100. for {
  101. select {
  102. case <-ticker.C:
  103. // 每秒运行fcm队列
  104. queues, err := f.repos.FirebaseMessageQueueRepository.FindRetrieableQueue(ctx)
  105. if err != nil {
  106. zap.L().Error("[firebase] ticker error", zap.Error(err))
  107. continue
  108. }
  109. for _, queue := range queues {
  110. var remainingRetries int = queue.RemainingRetries
  111. remainingRetries = remainingRetries - 1
  112. if remainingRetries > 0 {
  113. queue.LastRetryTime = Now(time.Now().UTC())
  114. queue.RemainingRetries = remainingRetries
  115. // 使用一次, 如果次数用完删除
  116. if err := f.repos.FirebaseMessageQueueRepository.Save(ctx, &queue); err != nil {
  117. zap.L().Error("[firebase] 更新队列信息错误", zap.Any("queue", queue), zap.Error(err))
  118. continue
  119. }
  120. } else {
  121. if err := f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID); err != nil {
  122. zap.L().Error("[firebase] 删除队列错误", zap.Any("queue", queue), zap.Error(err))
  123. continue
  124. }
  125. }
  126. timeoutCtx, cannel := context.WithCancel(ctx)
  127. go func(ctx context.Context, queue models.FirebaseMessagingQueue) {
  128. defer cannel()
  129. receiver := queue.Receiver
  130. firebaseToken, err := f.repos.FirebaseMessageRepository.GetFirebaseToken(ctx, receiver)
  131. if err != nil {
  132. if eris.Is(err, models.ErrRecordNotFound) {
  133. return
  134. }
  135. zap.L().Error("unable to find firebase message database",
  136. zap.Error(err),
  137. zap.String("user_id", receiver))
  138. return
  139. }
  140. aps := map[string]interface{}{
  141. "alert": map[string]interface{}{
  142. "title": queue.Title,
  143. "body": queue.Body,
  144. },
  145. }
  146. apsBuf, _ := json.Marshal(aps)
  147. msg := &messaging.Message{
  148. Token: firebaseToken.Token,
  149. Data: map[string]string{
  150. "aps": string(apsBuf),
  151. "message": queue.Data,
  152. },
  153. // Notification: &messaging.Notification{
  154. // Title: queue.Title,
  155. // Body: queue.Body,
  156. // },
  157. }
  158. zap.L().Info("[firebase] Send", zap.Any("msg", msg), zap.String("user_id", receiver))
  159. var resultName string
  160. if resultName, err = client.Send(ctx, msg); err != nil {
  161. zap.L().Error("[firebase] 发送错误", zap.Error(err), zap.String("user_id", receiver))
  162. return
  163. }
  164. zap.L().Info("[firebase] Ack", zap.String("user_id", receiver))
  165. client.Send(ctx, &messaging.Message{})
  166. zap.L().Info("[firebase] Successfully",
  167. zap.Any("msg", msg),
  168. zap.String("user_id", receiver),
  169. zap.String("result", resultName))
  170. }(timeoutCtx, queue)
  171. select {
  172. case <-timeoutCtx.Done():
  173. _ = f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID)
  174. case <-time.After(time.Duration(4 * time.Second)):
  175. zap.L().Error("[firebase] timeout", zap.Any("queue", queue))
  176. }
  177. }
  178. }
  179. }
  180. }
  181. func Now(n time.Time) *time.Time {
  182. return &n
  183. }