浏览代码

语聊消息可回复

luoyangwei 1 年之前
父节点
当前提交
e352293860
共有 5 个文件被更改,包括 74 次插入41 次删除
  1. 5 1
      server/client.go
  2. 6 6
      server/hub.go
  3. 6 9
      server/message.go
  4. 56 24
      server/nats.go
  5. 1 1
      websocket.go

+ 5 - 1
server/client.go

@@ -127,6 +127,9 @@ func (c *Client) Writer() {
 				}
 				}
 
 
 				message.Type = MessageTypeDownChating
 				message.Type = MessageTypeDownChating
+
+			case MessageTypeError:
+
 			}
 			}
 
 
 			c.WriteMessage(message)
 			c.WriteMessage(message)
@@ -158,8 +161,9 @@ func (c *Client) WriteMessage(message *Message) {
 // the server is ready to send a message to the client,
 // the server is ready to send a message to the client,
 // or when the server receives a message from the client, it processes the message
 // or when the server receives a message from the client, it processes the message
 func (c *Client) Processing(bytes []byte) {
 func (c *Client) Processing(bytes []byte) {
-	message := deserializeMessage(c.UserId, bytes)
+	message := deserializeMessage(bytes)
 	ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
 	ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
+	message.sender = c.UserId
 
 
 	// Persistent messages, save message
 	// Persistent messages, save message
 	if err := c.persistenceMessage(ctx, message); err != nil {
 	if err := c.persistenceMessage(ctx, message); err != nil {

+ 6 - 6
server/hub.go

@@ -1,7 +1,6 @@
 package server
 package server
 
 
 import (
 import (
-	"context"
 	"sync"
 	"sync"
 
 
 	"go.uber.org/zap"
 	"go.uber.org/zap"
@@ -69,12 +68,13 @@ func (h *Hub) run() {
 			} else {
 			} else {
 				h.Nats.Send <- &natsMessage{userId: message.Receiver, message: message}
 				h.Nats.Send <- &natsMessage{userId: message.Receiver, message: message}
 			}
 			}
-			// TODO 检查用户如果不在线,并且有 FCM token,则发送 FCM 消息
+
 			h.mutex.RUnlock()
 			h.mutex.RUnlock()
-		case firebaseMessage := <-h.FirebaseMessage:
-			if err := h.FCM.Send(context.Background(), firebaseMessage.token, firebaseMessage.message); err != nil {
-				zap.L().Error("unable to send fcm message", zap.Error(err))
-			}
+			//  检查用户如果不在线,并且有 FCM token,则发送 FCM 消息
+			// case firebaseMessage := <-h.FirebaseMessage:
+			// 	if err := h.FCM.Send(context.Background(), firebaseMessage.token, firebaseMessage.message); err != nil {
+			// 		zap.L().Error("unable to send fcm message", zap.Error(err))
+			// 	}
 		}
 		}
 
 
 	}
 	}

+ 6 - 9
server/message.go

@@ -135,7 +135,7 @@ func (m *Message) String() string {
 	return string(buf)
 	return string(buf)
 }
 }
 
 
-func deserializeMessage(userId string, bytes []byte) *Message {
+func deserializeMessage(bytes []byte) *Message {
 	if len(bytes) == 0 {
 	if len(bytes) == 0 {
 		return emptyMessage()
 		return emptyMessage()
 	}
 	}
@@ -153,7 +153,7 @@ func deserializeMessage(userId string, bytes []byte) *Message {
 	case MessageTypeUpChating, MessageTypeDownChating:
 	case MessageTypeUpChating, MessageTypeDownChating:
 		var chatingContent ChatingContent
 		var chatingContent ChatingContent
 		if err := mapstructure.Decode(message.Content, &chatingContent); err != nil {
 		if err := mapstructure.Decode(message.Content, &chatingContent); err != nil {
-			return errorMessage(userId, message.RequestId, eris.Wrap(err, "failed to decode chating content"))
+			return errorMessage(message.RequestId, eris.Wrap(err, "failed to decode chating content"))
 		}
 		}
 		message.Content = chatingContent
 		message.Content = chatingContent
 
 
@@ -161,7 +161,7 @@ func deserializeMessage(userId string, bytes []byte) *Message {
 	case MessageTypeNotification:
 	case MessageTypeNotification:
 		var notificationContent NotificationContent
 		var notificationContent NotificationContent
 		if err := mapstructure.Decode(message.Content, &notificationContent); err != nil {
 		if err := mapstructure.Decode(message.Content, &notificationContent); err != nil {
-			return errorMessage(userId, message.RequestId, eris.Wrap(err, "failed to decode notify content"))
+			return errorMessage(message.RequestId, eris.Wrap(err, "failed to decode notify content"))
 		}
 		}
 		message.Content = notificationContent
 		message.Content = notificationContent
 
 
@@ -169,7 +169,7 @@ func deserializeMessage(userId string, bytes []byte) *Message {
 	case MessageTypeLocation:
 	case MessageTypeLocation:
 		var locationMessageContent LocationMessageContent
 		var locationMessageContent LocationMessageContent
 		if err := mapstructure.Decode(message.Content, &locationMessageContent); err != nil {
 		if err := mapstructure.Decode(message.Content, &locationMessageContent); err != nil {
-			return errorMessage(userId, message.RequestId, eris.Wrap(err, "failed to decode location content"))
+			return errorMessage(message.RequestId, eris.Wrap(err, "failed to decode location content"))
 		}
 		}
 		message.Content = locationMessageContent
 		message.Content = locationMessageContent
 
 
@@ -177,12 +177,10 @@ func deserializeMessage(userId string, bytes []byte) *Message {
 	case MessageTypeVideoCall:
 	case MessageTypeVideoCall:
 		var videoCallMessageContent VideoCallMessageContent
 		var videoCallMessageContent VideoCallMessageContent
 		if err := mapstructure.Decode(message.Content, &videoCallMessageContent); err != nil {
 		if err := mapstructure.Decode(message.Content, &videoCallMessageContent); err != nil {
-			return errorMessage(userId, message.RequestId, eris.Wrap(err, "failed to decode video call content"))
+			return errorMessage(message.RequestId, eris.Wrap(err, "failed to decode video call content"))
 		}
 		}
 		message.Content = videoCallMessageContent
 		message.Content = videoCallMessageContent
 	}
 	}
-
-	message.sender = userId
 	return &message
 	return &message
 }
 }
 
 
@@ -210,10 +208,9 @@ func emptyMessage() *Message {
 	return &Message{Type: MessageTypeEmpty}
 	return &Message{Type: MessageTypeEmpty}
 }
 }
 
 
-func errorMessage(sender, requestId string, err error) *Message {
+func errorMessage(requestId string, err error) *Message {
 	return &Message{
 	return &Message{
 		Type:      MessageTypeError,
 		Type:      MessageTypeError,
-		Receiver:  sender,
 		RequestId: requestId,
 		RequestId: requestId,
 		Content:   err.Error(),
 		Content:   err.Error(),
 	}
 	}

+ 56 - 24
server/nats.go

@@ -3,10 +3,14 @@ package server
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"sync"
 	"sync"
+	"time"
 
 
 	"github.com/nats-io/nats.go"
 	"github.com/nats-io/nats.go"
 	"go.uber.org/zap"
 	"go.uber.org/zap"
+	"sikey.com/websocket/repositories"
+	"x.sikey.com.cn/serverx/dbx"
 	"x.sikey.com.cn/serverx/natx"
 	"x.sikey.com.cn/serverx/natx"
+	"x.sikey.com.cn/serverx/rdbx"
 )
 )
 
 
 const (
 const (
@@ -39,6 +43,8 @@ type Nats struct {
 	Send        chan *natsMessage
 	Send        chan *natsMessage
 	Subscribe   chan *subscriber
 	Subscribe   chan *subscriber
 	Unsubscribe chan *subscriber
 	Unsubscribe chan *subscriber
+
+	Repositories *repositories.Repositories
 }
 }
 
 
 type subscriber struct {
 type subscriber struct {
@@ -60,6 +66,9 @@ func NewNats(addr string) *Nats {
 		Subscribers: make(map[string]*subscriber),
 		Subscribers: make(map[string]*subscriber),
 		Subscribe:   make(chan *subscriber),
 		Subscribe:   make(chan *subscriber),
 		Unsubscribe: make(chan *subscriber),
 		Unsubscribe: make(chan *subscriber),
+
+		// db
+		Repositories: repositories.NewRepositories(dbx.GetConnect(), rdbx.GetConnect()),
 	}
 	}
 
 
 	_, err := nc.ChanSubscribe(subject, n.ch)
 	_, err := nc.ChanSubscribe(subject, n.ch)
@@ -76,7 +85,8 @@ func (n *Nats) run() {
 		select {
 		select {
 		case nMsg := <-n.Send:
 		case nMsg := <-n.Send:
 			bytes := serializationMessage(nMsg.message)
 			bytes := serializationMessage(nMsg.message)
-			if err := n.nc.PublishMsg(&nats.Msg{
+			timeout := 5 * time.Second
+			if _, err := n.nc.RequestMsg(&nats.Msg{
 				Subject: subject,
 				Subject: subject,
 				Data:    bytes,
 				Data:    bytes,
 				Header: nats.Header{
 				Header: nats.Header{
@@ -84,45 +94,65 @@ func (n *Nats) run() {
 					"sender":     []string{nMsg.message.sender},
 					"sender":     []string{nMsg.message.sender},
 					"request_id": []string{nMsg.message.RequestId},
 					"request_id": []string{nMsg.message.RequestId},
 				},
 				},
-			}); err != nil {
+			}, timeout); err != nil {
 				zap.L().Error("[nats] unable to message send",
 				zap.L().Error("[nats] unable to message send",
 					zap.Error(err),
 					zap.Error(err),
 					zap.String("user_id", nMsg.userId))
 					zap.String("user_id", nMsg.userId))
 			}
 			}
 		case msg := <-n.ch:
 		case msg := <-n.ch:
-			userIds := msg.Header.Values(headerUserId)
-			if len(userIds) == 0 {
-				zap.L().Info("[nats] received empty userIds")
+			requestId := msg.Header.Get("request_id")
+			sender := msg.Header.Get("sender")
+			ids := msg.Header.Values("user_id")
+			if len(ids) == 0 {
+				zap.L().Info("[nats] received empty userIds",
+					zap.String("request_id", requestId))
 				continue
 				continue
 			}
 			}
+
+			message := deserializeMessage(msg.Data)
+			message.sender = sender
+
+			// Error handler
+			if message.Type == MessageTypeError {
+				zap.L().Error("[nats] received error message",
+					zap.String("request_id", requestId),
+					zap.ByteString("message", serializationMessage(message)))
+				resp := RespondStructural{
+					RequestId: message.RequestId,
+					Ok:        false,
+					ErrMsg:    message.Content.(string),
+				}
+				_ = msg.Respond([]byte(resp.Marshaler()))
+				continue
+			}
+
 			zap.L().Info("[nats] received nats message",
 			zap.L().Info("[nats] received nats message",
-				zap.Strings("ids", userIds),
-				zap.String("request_id", msg.Header.Get("request_id")))
+				zap.Strings("ids", ids), zap.String("request_id", requestId))
 
 
 			n.mutex.RLock()
 			n.mutex.RLock()
-			for _, uid := range userIds {
+			for _, uid := range ids {
 				if s, ok := n.Subscribers[uid]; ok {
 				if s, ok := n.Subscribers[uid]; ok {
-					c := s.client
-					message := deserializeMessage(uid, msg.Data)
-					ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
-					sender := msg.Header.Get("sender")
+					clt := s.client
+					ctx := clt.withRequestIdContext(clt.ctx.Copy(), message.RequestId)
 
 
+					// Sender
 					message.sender = sender
 					message.sender = sender
 					if message.Receiver == "" {
 					if message.Receiver == "" {
 						message.Receiver = uid
 						message.Receiver = uid
 					}
 					}
-					if err := c.persistenceMessage(ctx, message); err != nil {
+
+					// Save message
+					if err := clt.persistenceMessage(ctx, message); err != nil {
 						zap.L().Error("[nats] unable to message",
 						zap.L().Error("[nats] unable to message",
 							zap.Error(err),
 							zap.Error(err),
 							zap.String("request_id", message.RequestId))
 							zap.String("request_id", message.RequestId))
+						resp := RespondStructural{
+							RequestId: message.RequestId,
+							Ok:        false,
+							ErrMsg:    err.Error(),
+						}
+						_ = msg.Respond([]byte(resp.Marshaler()))
 						continue
 						continue
-						// resp := RespondStructural{
-						// 	RequestId: message.MessageId,
-						// 	Ok:        false,
-						// 	ErrMsg:    err.Error(),
-						// }
-						// _ = msg.Respond([]byte(resp.Marshaler()))
-						// continue
 					}
 					}
 
 
 					zap.L().Info("[nats] relay received message",
 					zap.L().Info("[nats] relay received message",
@@ -130,13 +160,15 @@ func (n *Nats) run() {
 						zap.String("message_id", message.MessageId),
 						zap.String("message_id", message.MessageId),
 						zap.ByteString("message", serializationMessage(message)),
 						zap.ByteString("message", serializationMessage(message)),
 						zap.String("request_id", message.RequestId))
 						zap.String("request_id", message.RequestId))
-					c.Send <- message
 
 
-					// response
-					// resp := RespondStructural{RequestId: message.MessageId, Ok: true}
-					// _ = msg.Respond([]byte(resp.Marshaler()))
+					// Message to client channel
+					clt.Send <- message
 				}
 				}
 			}
 			}
+			// response
+			resp := RespondStructural{RequestId: message.MessageId, Ok: true}
+			_ = msg.Respond([]byte(resp.Marshaler()))
+
 			n.mutex.RUnlock()
 			n.mutex.RUnlock()
 		case s := <-n.Subscribe:
 		case s := <-n.Subscribe:
 			n.mutex.Lock()
 			n.mutex.Lock()

+ 1 - 1
websocket.go

@@ -22,7 +22,7 @@ import (
 )
 )
 
 
 var name = flag.String("n", "w303", "the name of the server")
 var name = flag.String("n", "w303", "the name of the server")
-var port = flag.Int64("p", 8080, "the port of the server")
+var port = flag.Int64("p", 8081, "the port of the server")
 var nodeId = flag.Int64("i", 1, "the node id of the server")
 var nodeId = flag.Int64("i", 1, "the node id of the server")
 var configFile = flag.String("f", "./etc/websocket.debug.yaml", "the config file")
 var configFile = flag.String("f", "./etc/websocket.debug.yaml", "the config file")