فهرست منبع

消息发送代码优化

luoyangwei 1 سال پیش
والد
کامیت
82e7e15ac7
3فایلهای تغییر یافته به همراه67 افزوده شده و 59 حذف شده
  1. 45 25
      server/client.go
  2. 22 28
      server/hub.go
  3. 0 6
      websocket.go

+ 45 - 25
server/client.go

@@ -2,6 +2,7 @@ package server
 
 import (
 	"encoding/json"
+	"regexp"
 	"time"
 
 	"github.com/gin-gonic/gin"
@@ -48,7 +49,6 @@ func (c *Client) reader() {
 				zlog.Errorf("error: %v", err)
 			}
 			// Close connect
-
 			break
 		}
 
@@ -57,43 +57,27 @@ func (c *Client) reader() {
 		case MessageTypePingPong:
 			zlog.Debugf("receive ping message from %s", c.UserId)
 		case MessageTypeUpChating, MessageTypeDownChating:
-
 			// Chat dialogue messages
 			chatingContent := message.Content.(ChatingContent)
 
 			// Save message to database
-			payload, _ := json.Marshal(chatingContent.Payload)
 			messageId := uuid.NewString()
-			err = c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
-				ID:          messageId,
-				SessionId:   chatingContent.Receiver,
-				Receiver:    chatingContent.Receiver,
-				Sender:      c.UserId,
-				Type:        message.Type,
-				ContentType: chatingContent.PayloadType,
-				Content:     string(payload),
-				IsRead:      false,
-			})
-
-			if err != nil {
-				c.writeError(message.RequestId, err)
-				continue
-			}
-
-			members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
-				c.ctx, chatingContent.Receiver, c.UserId)
+			chatingContent.MessageId = messageId
+			err = c.saveMessage(messageId, message.Type, &chatingContent)
 			if err != nil {
 				c.writeError(message.RequestId, err)
 				continue
 			}
 
-			chatingContent.MessageId = messageId
-			for _, member := range members {
+			// Receiver ID format determines whether the receiver is an account or a session
+			users := c.getReceiverUserIds(chatingContent.Receiver)
+			for _, id := range users {
 				var messaging = *message
-				messaging.Receiver = member.AccountId
+				messaging.Receiver = id
 				messaging.Content = chatingContent
+				zlog.Info("Send message to ", id)
+
 				c.hub.Message <- &messaging
-				zlog.Info("Send message to ", member.AccountId)
 			}
 
 			// Reply message id
@@ -149,3 +133,39 @@ func (c *Client) writeError(requestId string, err error) {
 		Content:   ContentError{Err: err.Error()},
 	}
 }
+
+func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
+	payload, _ := json.Marshal(content.Payload)
+	return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
+		ID:          messageId,
+		SessionId:   content.Receiver,
+		Receiver:    content.Receiver,
+		Sender:      c.UserId,
+		Type:        messageType,
+		ContentType: content.PayloadType,
+		Content:     string(payload),
+		IsRead:      false,
+	})
+}
+
+// getReceiverUserIds 通过 receiver 获取接收者的用户ID
+// 使用正则表达式验证ID 是否是 account_id 或 session_id
+// session_id 的话需要查询 session_member 表获取 session 的成员
+func (c *Client) getReceiverUserIds(receiver string) []string {
+	reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
+	if reg.Match([]byte(receiver)) {
+		return []string{receiver}
+	}
+
+	members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
+		c.ctx, receiver, c.UserId)
+	if err != nil {
+		return []string{}
+	}
+
+	var ms = make([]string, len(members))
+	for i, memb := range members {
+		ms[i] = memb.AccountId
+	}
+	return ms
+}

+ 22 - 28
server/hub.go

@@ -52,7 +52,6 @@ func (h *Hub) run() {
 
 			h.mutex.Lock()
 			h.clients[client.UserId] = client
-			// h.exchange.OnPublishConnect(client)
 			h.mutex.Unlock()
 
 		case client := <-h.Disconnect:
@@ -60,7 +59,6 @@ func (h *Hub) run() {
 			h.mutex.Lock()
 			close(client.Send)
 			delete(h.clients, client.UserId)
-			// h.exchange.OnPublishDisconnect(client)
 			h.mutex.Unlock()
 
 		case message := <-h.Message:
@@ -68,7 +66,7 @@ func (h *Hub) run() {
 			h.mutex.RLock()
 			if client, ok := h.clients[message.Receiver]; ok {
 				if client.isRemotely {
-					h.exchange.OnPublishMessage(client, message)
+					// h.exchange.OnPublishMessage(client, message)
 				} else {
 					zlog.Info("message: ", message)
 					client.Send <- message
@@ -76,36 +74,32 @@ func (h *Hub) run() {
 			}
 			h.mutex.RUnlock()
 
-		case conn := <-h.exchange.Connect:
+			// case conn := <-h.exchange.Connect:
 
-			h.mutex.Lock()
-			h.clients[conn.UserId] = &Client{
-				isRemotely: true,
-				hub:        h,
-				Send:       make(chan *Message),
-			}
-			h.mutex.Unlock()
+			// 	h.mutex.Lock()
+			// 	h.clients[conn.UserId] = &Client{
+			// 		isRemotely: true,
+			// 		hub:        h,
+			// 		Send:       make(chan *Message),
+			// 	}
+			// 	h.mutex.Unlock()
 
-		case conn := <-h.exchange.Disconnect:
+			// case conn := <-h.exchange.Disconnect:
 
-			h.mutex.Lock()
-			if client, ok := h.clients[conn.UserId]; ok {
-				close(client.Send)
-				delete(h.clients, client.UserId)
-			}
-			h.mutex.Unlock()
+			// 	h.mutex.Lock()
+			// 	if client, ok := h.clients[conn.UserId]; ok {
+			// 		close(client.Send)
+			// 		delete(h.clients, client.UserId)
+			// 	}
+			// 	h.mutex.Unlock()
 
-		case message := <-h.exchange.Message:
+			// case message := <-h.exchange.Message:
 
-			h.mutex.RLock()
-			if client, ok := h.clients[message.Receiver]; ok {
-				client.Send <- message
-			}
-			h.mutex.RUnlock()
+			// 	h.mutex.RLock()
+			// 	if client, ok := h.clients[message.Receiver]; ok {
+			// 		client.Send <- message
+			// 	}
+			// 	h.mutex.RUnlock()
 		}
 	}
 }
-
-func (h *Hub) GetClients() map[string]*Client {
-	return h.clients
-}

+ 0 - 6
websocket.go

@@ -55,11 +55,5 @@ func newApp() *gin.Engine {
 	}
 
 	app.GET("/websocket/endpoint", func(ctx *gin.Context) { server.WebsocketHandler(ctx, srv) })
-	app.GET("/index", func(ctx *gin.Context) {
-		clients := hub.GetClients()
-		ctx.JSON(http.StatusOK, gin.H{
-			"clients": len(clients),
-		})
-	})
 	return app
 }