123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- package server
- import (
- "context"
- "encoding/json"
- "regexp"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "github.com/mitchellh/mapstructure"
- "github.com/rotisserie/eris"
- "go.uber.org/zap"
- "sikey.com/websocket/models"
- "sikey.com/websocket/pkg/gid"
- "sikey.com/websocket/repositories"
- )
- type Client struct {
- ctx *gin.Context
- UserId string
- hub *Hub
- UnderlyingConn *websocket.Conn
- online *models.Online
- // logger *zlog.Logger
- isSimpleMsg bool // isSimpleMsg 是否是简单消息
- localization string // localization 国际码
- firebaseToken string // firebaseToken FCM 推送的 token
- // Send message channel 发送消息
- // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
- Send chan *Message
- ReplySend chan *Message // ReplySend 回复消息专用 Channel
- readWait time.Duration // readWait 读超时
- writeWait time.Duration // writeWait 写超时
- pingpongWait time.Duration // pingWait 心跳超时
- repos *repositories.Repositories
- }
- func (c *Client) withRequestIdContext(ctx context.Context, requestId string) context.Context {
- return context.WithValue(ctx, "request_id", requestId)
- }
- func (c *Client) Reader() {
- defer func() {
- zap.L().Info("[conn] client offline", zap.String("user_id", c.UserId))
- c.hub.Disconnect <- c
- c.Close()
- }()
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
- for {
- _, bytes, err := c.UnderlyingConn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- zap.L().Error("[conn] normal disconnection",
- zap.String("user_id", c.UserId),
- zap.Error(err))
- return
- }
- // Close connect
- _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
- zap.L().Error("[conn] read message error",
- zap.Error(err),
- zap.String("user_id", c.UserId))
- return
- }
- // To processing take over message
- c.Processing(bytes)
- }
- }
- // Writer is event listening for servers sending messages to clients
- func (c *Client) Writer() {
- ticker := time.NewTicker(c.pingpongWait)
- defer func() {
- ticker.Stop()
- c.Close()
- }()
- for {
- select {
- case message, ok := <-c.ReplySend:
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if !ok {
- // The hub closed the channel.
- c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
- return
- }
- c.WriteMessage(message)
- case message, ok := <-c.Send:
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if !ok {
- // The hub closed the channel.
- c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
- return
- }
- // Received modification message status
- switch message.Type {
- case MessageTypeUpChating:
- // Chat dialogue messages
- // if chatingContent, ok := message.Content.(ChatingContent); ok {
- // msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId)
- // if err == nil {
- // // msg.Received = true
- // // if err := c.repos.SessionRepository.UpdateMessage(c.ctx, msg); err != nil {
- // // zap.L().Error("[client] [writer] unable to update message status received",
- // // zap.Error(err),
- // // zap.String("message_id", message.MessageId),
- // // zap.String("request_id", message.RequestId))
- // // }
- // } else {
- // if eris.Is(err, models.ErrRecordNotFound) {
- // break
- // }
- // zap.L().Error("[client] [writer] unable to find message", zap.Error(err),
- // zap.String("message_id", message.MessageId),
- // zap.String("request_id", message.RequestId))
- // }
- // // Add Unread message count
- // _, err = c.repos.UnreadCounterRepository.AddUnreadCount(c.ctx, message.Receiver, chatingContent.SessionId, 1)
- // if err != nil {
- // zap.L().Error("[client] [writer] unable to add unread count",
- // zap.Error(err),
- // zap.String("message_id", message.MessageId),
- // zap.String("request_id", message.RequestId))
- // return
- // }
- // }
- message.Type = MessageTypeDownChating
- case MessageTypeError:
- }
- c.WriteMessage(message)
- case <-ticker.C:
- // 到时间发送 ping 信号
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
- return
- }
- }
- }
- }
- func (c *Client) WriteMessage(message *Message) {
- if c.UnderlyingConn == nil {
- return
- }
- err := c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
- if err != nil {
- zap.L().Error("[client] [writer] unable to write message",
- zap.Error(err),
- zap.String("message_id", message.MessageId),
- zap.String("request_id", message.RequestId))
- return
- }
- }
- // Processing message
- // the server is ready to send a message to the client,
- // or when the server receives a message from the client, it processes the message
- func (c *Client) Processing(bytes []byte) {
- message := deserializeMessage(bytes)
- ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
- message.sender = c.UserId
- // Persistent messages, save message
- if message.Type == MessageTypeUpChating {
- if err := c.persistenceMessage(ctx, message); err != nil {
- c.writeError(ctx, err)
- return
- }
- }
- switch message.Type {
- case MessageTypePingPong:
- c.procHeartbeat(ctx) // Heartbeat message
- case MessageTypeUpChating:
- // Chat dialogue messages
- if chatingContent, ok := message.Content.(ChatingContent); ok {
- chatingContent.MessageId = message.MessageId
- c.procChating(ctx, message.Type, chatingContent)
- }
- case MessageTypeNotification:
- // Notification message
- if notificationContent, ok := message.Content.(NotificationContent); ok {
- c.procNotify(ctx, notificationContent)
- }
- case MessageTypeError:
- c.hub.Message <- message
- }
- // Reply message
- if message.IsNeedReply() && !message.IsError() {
- c.ReplySend <- newReplyMessage(message)
- }
- // Reset read deadline, prevent Reader from shutting down
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingpongWait))
- }
- // persistenceMessage persistent message
- func (c *Client) persistenceMessage(ctx context.Context, message *Message) error {
- requestId := ctx.Value("request_id").(string)
- (*message).MessageId = gid.GetSnowflakeId()
- // Create message model
- mod := models.SessionMessage{
- ID: message.MessageId,
- Receiver: message.Receiver,
- Sender: message.sender,
- PayloadType: 1,
- SentAt: time.Now().UTC(),
- }
- zap.L().Info("[persistence] save database begin",
- zap.Int8("type", message.Type),
- zap.Any("content", message.Content),
- zap.String("request_id", requestId))
- switch message.Type {
- case MessageTypePingPong, MessageTypeEmpty, MessageTypeError:
- // Heartbeat message, empty message, error message
- zap.L().Info("[persistence] not save end",
- zap.Int8("type", message.Type),
- zap.Any("content", message.Content),
- zap.String("request_id", requestId))
- return nil
- case MessageTypeUpChating:
- // Chat dialogue messages
- chatingContent := message.Content.(ChatingContent)
- chatingContent.MessageId = message.MessageId // Important, not having this line may result in the inability to receive messages
- mod.PayloadType = chatingContent.PayloadType // Important, not having this line may result in the inability to receive messages
- mod.Receiver = chatingContent.Receiver
- mod.SessionId = chatingContent.SessionId
- // Format the structure. Sometimes,
- // the data passed in may contain fields that contaminate the structure.
- // This step ensures the consistency of the structure
- var err error
- switch chatingContent.PayloadType {
- case ChatingContentTypeText:
- // Text payload
- var textContent ContentText
- err = mapstructure.Decode(chatingContent.Payload, &textContent)
- chatingContent.Payload = textContent
- case ChatingContentTypeMetadata:
- // Metadata payload
- var contentMetadata ContentMetadata
- err = mapstructure.Decode(chatingContent.Payload, &contentMetadata)
- chatingContent.Payload = contentMetadata
- }
- if err != nil {
- return eris.Wrap(err, "unable to decode message content")
- }
- payload, _ := json.Marshal(chatingContent.Payload)
- mod.Payload = payload
- mod.SentAt = time.UnixMilli(chatingContent.SendTime)
- message.Content = chatingContent
- // case MessageTypeNotification:
- // // Notification message
- // notificationContent := message.Content.(NotificationContent)
- // mod.ContentType = uint8(notificationContent.ID)
- // // Notification payload
- // if len(notificationContent.Payload) > 0 {
- // payload, _ := json.Marshal(notificationContent.Payload)
- // mod.Content = payload
- // } else {
- // mod.Content = []byte("{}")
- // }
- // case MessageTypeLocation:
- // // Location message
- // locationMessageContent := message.Content.(LocationMessageContent)
- // // Marshaler payload
- // payload, err := json.Marshal(locationMessageContent)
- // if err != nil {
- // zap.L().Error("marshal error", zap.Error(err), zap.String("request_id", requestId))
- // }
- // mod.Content = payload
- // case MessageTypeVideoCall:
- // Video call message
- // videoCallMessageContent := message.Content.(VideoCallMessageContent)
- // payload, _ := json.Marshal(videoCallMessageContent)
- // mod.Content = payload
- default:
- return eris.New("unknown message type")
- }
- err := c.repos.Transaction(ctx, func(ctx context.Context, repos *repositories.Repositories) error {
- err := c.repos.SessionRepository.CreateMessage(c.ctx, &mod)
- if err != nil {
- zap.L().Error("[persistence] unable to message",
- zap.Error(err),
- zap.String("user_id", c.UserId),
- zap.String("request_id", requestId))
- return eris.Wrap(err, "unable to persistence message")
- }
- // 将自己设置为已读
- err = c.repos.MessageReadLogRepository.CreateMessageReadLog(ctx, &models.MessageReadLog{
- MessageId: mod.ID,
- Reader: mod.Sender,
- })
- return err
- })
- if err != nil {
- return err
- }
- zap.L().Info("[persistence] save database end",
- zap.String("request_id", requestId),
- zap.String("message_id", message.MessageId),
- zap.String("user_id", c.UserId),
- zap.String("sender", mod.Sender))
- return nil
- }
- // procNotify received message processing
- func (c *Client) procNotify(ctx context.Context, notificationContent NotificationContent) {
- requestId := ctx.Value("request_id").(string)
- zap.L().Info("[client] [reader] received notify message",
- zap.Int("notify_id", notificationContent.ID),
- zap.String("request_id", requestId),
- zap.String("user_id", c.UserId))
- // Ask for location
- messaging := Message{
- sender: c.UserId,
- Type: MessageTypeNotification,
- RequestId: requestId,
- }
- switch notificationContent.ID {
- case NotificationTypeAskLocation:
- messaging.Content = NotificationContent{ID: NotificationTypeAskLocation}
- c.hub.Message <- &messaging
- }
- }
- // procChating received message processing
- func (c *Client) procChating(ctx context.Context, msgType MessageType, chatingContent ChatingContent) {
- requestId := ctx.Value("request_id").(string)
- chatingContent.SessionId = chatingContent.Receiver
- // Receiver ID format determines whether the receiver is an account or a session
- users := c.regexpReceiveUserIds(chatingContent.Receiver)
- zap.L().Info("[client] [reader] received chating message",
- zap.String("receiver", chatingContent.Receiver),
- zap.String("request_id", requestId),
- zap.String("user_id", c.UserId),
- zap.Strings("users", users))
- for _, id := range users {
- messaging := Message{
- sender: c.UserId,
- Type: msgType,
- RequestId: requestId,
- Receiver: id,
- Content: chatingContent,
- }
- c.hub.Message <- &messaging
- }
- }
- // procHeartbeat message
- func (c *Client) procHeartbeat(ctx context.Context) {
- zap.L().Info("[client] [reader] receive ping message",
- zap.String("user_id", c.UserId),
- zap.String("request_id", ctx.Value("request_id").(string)))
- _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
- }
- func (c *Client) writeError(ctx context.Context, err error) {
- c.Send <- &Message{
- RequestId: ctx.Value("request_id").(string),
- Type: MessageTypeError,
- Content: ContentError{Err: err.Error()},
- }
- }
- // regexpReceiveUserIds 通过 receiver 获取接收者的用户ID
- // 使用正则表达式验证ID 是否是 account_id 或 session_id
- // session_id 的话需要查询 session_member 表获取 session 的成员
- func (c *Client) regexpReceiveUserIds(receiver string) []string {
- if receiver == "" {
- return []string{}
- }
- reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
- if reg.Match([]byte(receiver)) {
- return []string{receiver}
- }
- members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
- c.ctx, receiver, c.UserId)
- if err != nil {
- zap.L().Error("unable to get session members", zap.Error(err))
- return []string{}
- }
- var ms = make([]string, len(members))
- for i, memb := range members {
- ms[i] = memb.RefId
- }
- return ms
- }
- // Close websocket connection
- func (c *Client) Close() {
- c.UnderlyingConn.Close()
- c.repos.OnlineRepository.Offline(c.ctx, c.online)
- }
|