client.go 7.3 KB


  1. package server
  2. import (
  3. "encoding/json"
  4. "regexp"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. "github.com/google/martian/v3/log"
  8. "github.com/gorilla/websocket"
  9. "github.com/mitchellh/mapstructure"
  10. "github.com/rotisserie/eris"
  11. "sikey.com/websocket/models"
  12. "sikey.com/websocket/repositories"
  13. "sikey.com/websocket/utils/gid"
  14. "x.sikey.com.cn/serverx/zlog"
  15. )
  16. type Client struct {
  17. ctx *gin.Context
  18. UserId string
  19. hub *Hub
  20. UnderlyingConn *websocket.Conn
  21. online *models.Online
  22. // logger *zlog.Logger
  23. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  24. localization string // localization 国际码
  25. firebaseToken string // firebaseToken FCM 推送的 token
  26. // Send message channel 发送消息
  27. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  28. Send chan *Message
  29. readWait time.Duration // readWait 读超时
  30. writeWait time.Duration // writeWait 写超时
  31. pingWait time.Duration // pingWait 心跳超时
  32. repos *repositories.Repositories
  33. }
  34. func (c *Client) reader() {
  35. defer func() {
  36. c.hub.Disconnect <- c
  37. c.Close()
  38. // c.logger.Info("client Offline")
  39. }()
  40. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  41. for {
  42. _, bytes, err := c.UnderlyingConn.ReadMessage()
  43. if err != nil {
  44. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  45. // c.logger.Errorf("error: %v", eris.Wrap(err, c.UserId))
  46. } else {
  47. // c.logger.Errorf("error: %v", eris.Wrap(err, c.UserId))
  48. }
  49. // Close connect
  50. // _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  51. return
  52. }
  53. var message = deserializeMessage(bytes)
  54. // var log = c.logger.WithField(keys.HeaderRequestId, message.RequestId)
  55. switch message.Type {
  56. case MessageTypePingPong:
  57. // zlog.Debugf("receive ping message from %s", c.UserId)
  58. _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  59. case MessageTypeUpChating, MessageTypeDownChating:
  60. // Chat dialogue messages
  61. chatingContent := message.Content.(ChatingContent)
  62. // Save message to database
  63. messageId := gid.GetMessageId()
  64. chatingContent.MessageId = messageId
  65. chatingContent.SessionId = chatingContent.Receiver
  66. err = c.saveMessage(messageId, message.Type, &chatingContent)
  67. if err != nil {
  68. c.writeError(message.RequestId, err)
  69. continue
  70. }
  71. // Receiver ID format determines whether the receiver is an account or a session
  72. // log.Infof("message receiver: %s", chatingContent.Receiver)
  73. users := c.getReceiverUserIds(chatingContent.Receiver)
  74. // log.Infof("message to users: %v", users)
  75. for _, id := range users {
  76. var messaging = *message
  77. messaging.Receiver = id
  78. messaging.Content = chatingContent
  79. // zlog.Infof("Send message %s to %s", c.UserId, id)
  80. // Check if the user is online
  81. if c.firebaseToken != "" {
  82. online, err := c.repos.OnlineRepository.Is(c.ctx, id)
  83. if err != nil {
  84. // log.Error(eris.Wrap(err, "unable to find online user"))
  85. continue
  86. }
  87. if !online {
  88. // Send FCM message
  89. // token, err := c.repos.FirebaseMessageRepository.GetFirebaseToken(c.ctx, id)
  90. // if err != nil {
  91. // if eris.Is(err, models.ErrRecordNotFound) {
  92. // // log.Debugf("user %s not found firebase token", id)
  93. // }
  94. // } else {
  95. // c.hub.FirebaseMessage <- &FirebaseMessage{token: token.Token, message: &messaging, clt: c}
  96. // }
  97. }
  98. }
  99. c.hub.Message <- &messaging
  100. }
  101. // Reply message id
  102. message.Content = &ContentReply{MessageId: messageId}
  103. }
  104. // Reply message
  105. // zlog.Debugf("reply message %s to %s", message.RequestId, c.UserId)
  106. if message.IsNeedReply() {
  107. c.Send <- newReplyMessage(message)
  108. // Reset read deadline, prevent Reader from shutting down
  109. // zlog.Debugf("reset read deadline for %s", c.UserId)
  110. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  111. }
  112. }
  113. }
  114. func (c *Client) writer() {
  115. ticker := time.NewTicker(c.pingWait)
  116. defer func() {
  117. ticker.Stop()
  118. c.Close()
  119. }()
  120. for {
  121. select {
  122. case message, ok := <-c.Send:
  123. // var log = c.logger
  124. if message != nil {
  125. // log = log.WithField(keys.HeaderRequestId, message.RequestId)
  126. }
  127. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  128. if !ok {
  129. // The hub closed the channel.
  130. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  131. return
  132. }
  133. var err error
  134. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  135. if err != nil {
  136. return
  137. }
  138. // Received modification message status
  139. switch message.Type {
  140. case MessageTypeUpChating, MessageTypeDownChating:
  141. // Chat dialogue messages
  142. if chatingContent, ok := message.Content.(ChatingContent); ok {
  143. if msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId); err == nil {
  144. msg.Received = true
  145. c.repos.SessionRepository.UpdateMessage(c.ctx, msg)
  146. } else {
  147. if eris.Is(err, models.ErrRecordNotFound) {
  148. break
  149. }
  150. // log.Error(err)
  151. }
  152. }
  153. }
  154. log.Infof("write message %s", message)
  155. case <-ticker.C:
  156. // 到时间发送 ping 信号
  157. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  158. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  159. return
  160. }
  161. }
  162. }
  163. }
  164. func (c *Client) writeError(requestId string, err error) {
  165. c.Send <- &Message{
  166. RequestId: requestId,
  167. Type: MessageTypeError,
  168. Content: ContentError{Err: err.Error()},
  169. }
  170. }
  171. func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
  172. // Standardized structure, This is not an unnecessary step!!!
  173. // Filter out excess fields.
  174. var err error
  175. switch content.PayloadType {
  176. case ChatingContentTypeText:
  177. var textContent ContentText
  178. err = mapstructure.Decode(content.Payload, &textContent)
  179. content.Payload = textContent
  180. case ChatingContentTypeMetadata:
  181. var contentMetadata ContentMetadata
  182. err = mapstructure.Decode(content.Payload, &contentMetadata)
  183. content.Payload = contentMetadata
  184. }
  185. if err != nil {
  186. return eris.Wrap(err, "unable to decode message content")
  187. }
  188. payload, _ := json.Marshal(content.Payload)
  189. return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  190. ID: messageId,
  191. SessionId: content.Receiver,
  192. Receiver: content.Receiver,
  193. Sender: c.UserId,
  194. Type: messageType,
  195. ContentType: content.PayloadType,
  196. Content: payload,
  197. IsRead: false,
  198. })
  199. }
  200. // getReceiverUserIds 通过 receiver 获取接收者的用户ID
  201. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  202. // session_id 的话需要查询 session_member 表获取 session 的成员
  203. func (c *Client) getReceiverUserIds(receiver string) []string {
  204. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  205. if reg.Match([]byte(receiver)) {
  206. return []string{receiver}
  207. }
  208. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  209. c.ctx, receiver, c.UserId)
  210. if err != nil {
  211. zlog.Error(eris.Wrap(err, "unable to get session members"))
  212. return []string{}
  213. }
  214. var ms = make([]string, len(members))
  215. for i, memb := range members {
  216. ms[i] = memb.RefId
  217. }
  218. return ms
  219. }
  220. // Close websocket connection
  221. func (c *Client) Close() {
  222. c.UnderlyingConn.Close()
  223. online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
  224. c.repos.OnlineRepository.Offline(c.ctx, online)
  225. }