Quellcode durchsuchen

通知增加是否发送成功

luoyangwei vor 1 Jahr
Ursprung
Commit
60c884b14e
5 geänderte Dateien mit 43 neuen und 5 gelöschten Zeilen
  1. 1 0
      models/notify.go
  2. 5 0
      models/vars.go
  3. 15 0
      repositories/notify_repository.go
  4. 19 4
      server/client.go
  5. 3 1
      server/message.go

+ 1 - 0
models/notify.go

@@ -10,6 +10,7 @@ type Notify struct {
 	Sender    string
 	Receiver  string
 	Payload   json.RawMessage
+	IsSent    int
 	CreatedAt time.Time
 	Id        int64
 }

+ 5 - 0
models/vars.go

@@ -3,3 +3,8 @@ package models
 import "gorm.io/gorm"
 
 var ErrRecordNotFound = gorm.ErrRecordNotFound
+
+const (
+	NotifyNotReceived = -1
+	NotifyReceived    = 1
+)

+ 15 - 0
repositories/notify_repository.go

@@ -9,6 +9,8 @@ import (
 
 type NotifyRepository interface {
 	Create(ctx context.Context, n *models.Notify) error
+	Find(ctx context.Context, id int64) (*models.Notify, error)
+	Save(ctx context.Context, notify *models.Notify) error
 }
 
 func NewNotifyRepository(source *gorm.DB) NotifyRepository {
@@ -19,6 +21,19 @@ type notifyRepository struct {
 	source *gorm.DB
 }
 
+// Save implements NotifyRepository.
+func (repo *notifyRepository) Save(ctx context.Context, notify *models.Notify) error {
+	return repo.source.WithContext(ctx).Save(notify).Error
+}
+
+// Find implements NotifyRepository.
+func (repo *notifyRepository) Find(ctx context.Context, id int64) (*models.Notify, error) {
+	var err error
+	var notify models.Notify
+	err = repo.source.WithContext(ctx).Where(&models.Notify{Id: id}).First(&notify).Error
+	return result(notify, err)
+}
+
 // Create implements NotifyRepository.
 func (repo *notifyRepository) Create(ctx context.Context, n *models.Notify) error {
 	return repo.source.WithContext(ctx).Model(&models.Notify{}).Create(n).Error

+ 19 - 4
server/client.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"database/sql"
 	"encoding/json"
+	"fmt"
 	"regexp"
 	"time"
 
@@ -74,9 +75,10 @@ func (c *Client) withRequestIdContext(ctx context.Context, requestId string) con
 // reader 读取到客户端发送的消息, 将消息发送到 nats 里
 func (c *Client) reader() {
 	defer func() {
-		c.srv.Disconnect <- c
-		c.nats.Unsubscribe <- &subscriber{client: c}
-		_ = c.UnderlyingConn.Close()
+		// c.srv.Disconnect <- c
+		// c.nats.Unsubscribe <- &subscriber{client: c}
+		// _ = c.UnderlyingConn.Close()
+		c.close()
 	}()
 
 	// 首次消息超时设置
@@ -104,7 +106,6 @@ func (c *Client) reader() {
 			zap.L().Error("[conn] read message error",
 				zap.String("user_id", c.UserId),
 				zap.Error(err))
-			// c.close()
 			return
 		}
 
@@ -233,6 +234,7 @@ func (c *Client) reader() {
 					NotifyId: notification.Content.ID,
 					Sender:   notification.Content.Sender,
 					Receiver: notification.Content.Receiver,
+					IsSent:   models.NotifyNotReceived,
 					Payload:  serializePayload(0, notification.Content.Payload),
 				}); err != nil {
 					c.ReplySend <- newErrorMessage(message.RequestId(), err)
@@ -242,6 +244,19 @@ func (c *Client) reader() {
 				c.Received <- message
 			}
 
+		case MessageTypeAck:
+
+			// 收到消息的回执, 处理回执消息
+			if notification, ok := message.(*Notification); ok {
+				notify, err := c.repos.NotifyRepository.Find(c.ctx, int64(notification.Content.AckId))
+				if err != nil || notify == nil {
+					c.ReplySend <- newErrorMessage(message.RequestId(), eris.New(fmt.Sprintf("unable to find notify ackId:%d", notification.Content.AckId)))
+					continue
+				}
+
+				notify.IsSent = models.NotifyReceived
+				_ = c.repos.NotifyRepository.Save(c.ctx, notify)
+			}
 		}
 	}
 }

+ 3 - 1
server/message.go

@@ -14,6 +14,7 @@ const (
 	MessageTypeError        MessageType = -1 // MessageTypeError 错误消息,当服务器出现错误时,会发送此消息
 	MessageTypePingPong     MessageType = 1  // MessageTypePingPong ping pong 消息
 	MessageTypeEmpty        MessageType = 2  //  MessageTypeEmpty 空消息
+	MessageTypeAck          MessageType = 3  // MessageTypeAck ack 消息, 用于确认客户端收到消息后, 向服务器发送的回执
 	MessageTypeUpChating    MessageType = 30 // MessageTypeUpChating 语聊消息 收到客户端语聊消息
 	MessageTypeDownChating  MessageType = 31 // MessageTypeDownChating 语聊消息 发送语聊消息到客户端
 	MessageTypeNotification MessageType = 40 // MessageTypeNotification 通知消息
@@ -155,7 +156,8 @@ type Notification struct {
 }
 
 type NotificationContent struct {
-	ID       int                    `json:"id"` // ID 通知消息的ID, 不同的通知会有不同的ID, 可以 NotificationType
+	ID       int                    `json:"id"`    // ID 通知消息的ID, 不同的通知会有不同的ID, 可以 NotificationType
+	AckId    int                    `json:"ackId"` // AckID 回执ID, 需要在 ACK 消息里返回给服务器
 	Receiver string                 `json:"receiver"`
 	Sender   string                 `json:"sender"`
 	Payload  map[string]interface{} `json:"payload,omitempty"`