client.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package server
  2. import (
  3. "encoding/json"
  4. "regexp"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gorilla/websocket"
  8. "github.com/mitchellh/mapstructure"
  9. "github.com/rotisserie/eris"
  10. "go.uber.org/zap"
  11. "sikey.com/websocket/models"
  12. "sikey.com/websocket/repositories"
  13. "x.sikey.com.cn/serverx/gid"
  14. )
  15. type Client struct {
  16. ctx *gin.Context
  17. UserId string
  18. hub *Hub
  19. UnderlyingConn *websocket.Conn
  20. online *models.Online
  21. // logger *zlog.Logger
  22. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  23. localization string // localization 国际码
  24. firebaseToken string // firebaseToken FCM 推送的 token
  25. // Send message channel 发送消息
  26. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  27. Send chan *Message
  28. readWait time.Duration // readWait 读超时
  29. writeWait time.Duration // writeWait 写超时
  30. pingWait time.Duration // pingWait 心跳超时
  31. repos *repositories.Repositories
  32. }
  33. func (c *Client) reader() {
  34. defer func() {
  35. zap.L().Info("client Offline", zap.String("user_id", c.UserId))
  36. c.hub.Disconnect <- c
  37. c.Close()
  38. }()
  39. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  40. for {
  41. _, bytes, err := c.UnderlyingConn.ReadMessage()
  42. if err != nil {
  43. // if websocket.IsUnexpectedCloseError(err,
  44. // websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  45. // log.Errorf("error: %v", eris.Wrap(err, c.UserId))
  46. // }
  47. zap.L().Error("read message error", zap.Error(err))
  48. // Close connect
  49. _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  50. return
  51. }
  52. var message = deserializeMessage(bytes)
  53. switch message.Type {
  54. case MessageTypePingPong:
  55. zap.L().Debug("receive ping message",
  56. zap.String("user_id", c.UserId),
  57. zap.String("request_id", message.RequestId))
  58. _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  59. case MessageTypeUpChating, MessageTypeDownChating:
  60. // Chat dialogue messages
  61. chatingContent := message.Content.(ChatingContent)
  62. // Save message to database
  63. messageId := gid.GetSnowflakeId()
  64. chatingContent.MessageId = messageId
  65. chatingContent.SessionId = chatingContent.Receiver
  66. err = c.saveMessage(messageId, message.Type, &chatingContent)
  67. if err != nil {
  68. c.writeError(message.RequestId, err)
  69. continue
  70. }
  71. // Receiver ID format determines whether the receiver is an account or a session
  72. users := c.getReceiverUserIds(chatingContent.Receiver)
  73. zap.L().Info("received message",
  74. zap.String("user_id", c.UserId),
  75. zap.String("receiver", chatingContent.Receiver),
  76. zap.String("request_id", message.RequestId),
  77. zap.Strings("users", users))
  78. for _, id := range users {
  79. var messaging = *message
  80. messaging.Receiver = id
  81. messaging.Content = chatingContent
  82. // Check if the user is online
  83. if c.firebaseToken != "" {
  84. online, err := c.repos.OnlineRepository.Is(c.ctx, id)
  85. if err != nil {
  86. zap.L().Error("unable to find online user", zap.Error(err))
  87. continue
  88. }
  89. if !online {
  90. // Send FCM message
  91. // token, err := c.repos.FirebaseMessageRepository.GetFirebaseToken(c.ctx, id)
  92. // if err != nil {
  93. // if eris.Is(err, models.ErrRecordNotFound) {
  94. // // log.Debugf("user %s not found firebase token", id)
  95. // }
  96. // } else {
  97. // c.hub.FirebaseMessage <- &FirebaseMessage{token: token.Token, message: &messaging, clt: c}
  98. // }
  99. }
  100. }
  101. c.hub.Message <- &messaging
  102. }
  103. // Reply message id
  104. message.Content = &ContentReply{MessageId: messageId}
  105. }
  106. // Reply message
  107. // log.Debugf("reply message %s to %s", message.RequestId, c.UserId)
  108. if message.IsNeedReply() {
  109. c.Send <- newReplyMessage(message)
  110. // Reset read deadline, prevent Reader from shutting down
  111. // zlog.Debugf("reset read deadline for %s", c.UserId)
  112. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  113. }
  114. }
  115. }
  116. func (c *Client) writer() {
  117. ticker := time.NewTicker(c.pingWait)
  118. defer func() {
  119. ticker.Stop()
  120. c.Close()
  121. }()
  122. for {
  123. select {
  124. case message, ok := <-c.Send:
  125. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  126. if !ok {
  127. // The hub closed the channel.
  128. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  129. return
  130. }
  131. var err error
  132. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  133. if err != nil {
  134. return
  135. }
  136. // Received modification message status
  137. switch message.Type {
  138. case MessageTypeUpChating, MessageTypeDownChating:
  139. // Chat dialogue messages
  140. if chatingContent, ok := message.Content.(ChatingContent); ok {
  141. if msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId); err == nil {
  142. msg.Received = true
  143. c.repos.SessionRepository.UpdateMessage(c.ctx, msg)
  144. } else {
  145. if eris.Is(err, models.ErrRecordNotFound) {
  146. break
  147. }
  148. zap.L().Error("unable to find message", zap.Error(err))
  149. }
  150. }
  151. }
  152. case <-ticker.C:
  153. // 到时间发送 ping 信号
  154. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  155. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  156. return
  157. }
  158. }
  159. }
  160. }
  161. func (c *Client) writeError(requestId string, err error) {
  162. c.Send <- &Message{
  163. RequestId: requestId,
  164. Type: MessageTypeError,
  165. Content: ContentError{Err: err.Error()},
  166. }
  167. }
  168. func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
  169. // Standardized structure, This is not an unnecessary step!!!
  170. // Filter out excess fields.
  171. var err error
  172. switch content.PayloadType {
  173. case ChatingContentTypeText:
  174. var textContent ContentText
  175. err = mapstructure.Decode(content.Payload, &textContent)
  176. content.Payload = textContent
  177. case ChatingContentTypeMetadata:
  178. var contentMetadata ContentMetadata
  179. err = mapstructure.Decode(content.Payload, &contentMetadata)
  180. content.Payload = contentMetadata
  181. }
  182. if err != nil {
  183. return eris.Wrap(err, "unable to decode message content")
  184. }
  185. payload, _ := json.Marshal(content.Payload)
  186. return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  187. ID: messageId,
  188. SessionId: content.Receiver,
  189. Receiver: content.Receiver,
  190. Sender: c.UserId,
  191. Type: messageType,
  192. ContentType: content.PayloadType,
  193. Content: payload,
  194. IsRead: false,
  195. SentAt: time.UnixMilli(content.SendTime),
  196. })
  197. }
  198. // getReceiverUserIds 通过 receiver 获取接收者的用户ID
  199. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  200. // session_id 的话需要查询 session_member 表获取 session 的成员
  201. func (c *Client) getReceiverUserIds(receiver string) []string {
  202. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  203. if reg.Match([]byte(receiver)) {
  204. return []string{receiver}
  205. }
  206. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  207. c.ctx, receiver, c.UserId)
  208. if err != nil {
  209. zap.L().Error("unable to get session members", zap.Error(err))
  210. return []string{}
  211. }
  212. var ms = make([]string, len(members))
  213. for i, memb := range members {
  214. ms[i] = memb.RefId
  215. }
  216. return ms
  217. }
  218. // Close websocket connection
  219. func (c *Client) Close() {
  220. c.UnderlyingConn.Close()
  221. online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
  222. c.repos.OnlineRepository.Offline(c.ctx, online)
  223. }