|
@@ -3,7 +3,6 @@ package server
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
- "fmt"
|
|
|
"log"
|
|
|
"time"
|
|
|
|
|
@@ -117,6 +116,7 @@ func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Me
|
|
|
// "videoCall": { "title": "Call Notification", "body": "click to view" }
|
|
|
var title string
|
|
|
var body = "click to view"
|
|
|
+ remainingRetries := 1
|
|
|
messageType := message.MessageType()
|
|
|
switch messageType {
|
|
|
case MessageTypeDownChating, MessageTypeUpChating:
|
|
@@ -129,6 +129,7 @@ func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Me
|
|
|
case MessageTypeNotification:
|
|
|
title = "New Notification"
|
|
|
case MessageTypeVideoCall:
|
|
|
+ remainingRetries = DefaultRemainingRetries
|
|
|
title = "Call Notification"
|
|
|
// 视频通话里的 payload 会有接通和挂断
|
|
|
// 这里需要过滤掉挂断的通知消息
|
|
@@ -148,7 +149,7 @@ func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Me
|
|
|
Body: body,
|
|
|
Receiver: uid,
|
|
|
Data: string(serializeMessage(message)),
|
|
|
- RemainingRetries: DefaultRemainingRetries,
|
|
|
+ RemainingRetries: remainingRetries,
|
|
|
}
|
|
|
if err := f.repos.FirebaseMessageQueueRepository.Create(ctx, queue); err != nil {
|
|
|
zap.L().Info("[firebase] 加入队列错误", zap.Error(err), zap.String("user_id", uid))
|
|
@@ -160,25 +161,24 @@ func (f *FirebaseMessageServer) Send(ctx context.Context, uid string, message Me
|
|
|
}
|
|
|
|
|
|
func (f *FirebaseMessageServer) queueRun(ctx context.Context) {
|
|
|
- zap.L().Info("[firebase] firebase queue running")
|
|
|
-
|
|
|
+ //zap.L().Info("[firebase] queue running")
|
|
|
// 每秒运行fcm队列
|
|
|
queues, err := f.repos.FirebaseMessageQueueRepository.FindRetrieableQueue(ctx)
|
|
|
if err != nil {
|
|
|
- zap.L().Error("[firebase] ticker error", zap.Error(err))
|
|
|
+ zap.L().Error("[firebase] FindRetrieableQueue error", zap.Error(err))
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
+ zap.L().Info("[firebase] queue running len ", zap.Int("queues", len(queues)))
|
|
|
for _, queue := range queues {
|
|
|
receiver := queue.Receiver
|
|
|
firebaseToken, err := f.repos.FirebaseMessageRepository.GetFirebaseToken(ctx, receiver)
|
|
|
if err != nil {
|
|
|
+ _ = f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID)
|
|
|
if eris.Is(err, models.ErrRecordNotFound) {
|
|
|
+ zap.L().Info("[firebase] GetFirebaseToken NotFound 删除离线消息", zap.String("user_id", receiver), zap.Int("queue.ID", queue.ID))
|
|
|
continue
|
|
|
}
|
|
|
- zap.L().Error("unable to find firebase message database",
|
|
|
- zap.Error(err),
|
|
|
- zap.String("user_id", receiver))
|
|
|
+ zap.L().Error("[firebase] unable to find firebase message database", zap.Error(err), zap.String("user_id", receiver), zap.Int("queue.ID", queue.ID))
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -214,7 +214,7 @@ func (f *FirebaseMessageServer) queueRun(ctx context.Context) {
|
|
|
var message NatsPubMessage[NatsPubVideoCallMessage]
|
|
|
err := json.Unmarshal([]byte(queue.Data), &message)
|
|
|
if err != nil {
|
|
|
- fmt.Println("Error unmarshaling JSON:", err)
|
|
|
+ zap.L().Error("[firebase] Error unmarshaling JSON: ", zap.Error(err), zap.String("user_id", receiver), zap.Int("queue.ID", queue.ID))
|
|
|
} else {
|
|
|
if message.Type == MessageTypeVideoCall {
|
|
|
msg.Android.Notification.ChannelID = "video_call"
|
|
@@ -227,12 +227,15 @@ func (f *FirebaseMessageServer) queueRun(ctx context.Context) {
|
|
|
zap.L().Info("[firebase] Send", zap.Any("msg", msg), zap.String("user_id", receiver), zap.String("system", firebaseToken.System))
|
|
|
var resultName string
|
|
|
if resultName, err = client.Send(ctx, msg); err != nil {
|
|
|
-
|
|
|
+ zap.L().Error("[firebase] 发送错误", zap.Error(err), zap.String("user_id", receiver), zap.Int("queue.ID", queue.ID))
|
|
|
+ remainingRetries := queue.RemainingRetries - 1
|
|
|
+ if remainingRetries == 0 {
|
|
|
+ _ = f.repos.FirebaseMessageQueueRepository.Delete(ctx, queue.ID)
|
|
|
+ return
|
|
|
+ }
|
|
|
queue.LastRetryTime = Now(time.Now().UTC())
|
|
|
- queue.RemainingRetries = queue.RemainingRetries - 1
|
|
|
+ queue.RemainingRetries = remainingRetries
|
|
|
_ = f.repos.FirebaseMessageQueueRepository.UpdateRetries(ctx, &queue)
|
|
|
-
|
|
|
- zap.L().Error("[firebase] 发送错误", zap.Error(err), zap.String("user_id", receiver))
|
|
|
return
|
|
|
}
|
|
|
|