client.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package server
  2. import (
  3. "encoding/json"
  4. "regexp"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. "github.com/gorilla/websocket"
  8. "github.com/rotisserie/eris"
  9. "gorm.io/gorm"
  10. "sikey.com/websocket/models"
  11. "sikey.com/websocket/repositories"
  12. "sikey.com/websocket/utils/gid"
  13. "sikey.com/websocket/utils/zlog"
  14. )
  15. type Client struct {
  16. ctx *gin.Context
  17. UserId string
  18. hub *Hub
  19. UnderlyingConn *websocket.Conn
  20. online *models.Online
  21. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  22. localization string // localization 国际码
  23. firebaseToken string // firebaseToken FCM 推送的 token
  24. // Send message channel 发送消息
  25. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  26. Send chan *Message
  27. readWait time.Duration // readWait 读超时
  28. writeWait time.Duration // writeWait 写超时
  29. pingWait time.Duration // pingWait 心跳超时
  30. repos *repositories.Repositories
  31. }
  32. func (c *Client) reader() {
  33. defer func() {
  34. c.hub.Disconnect <- c
  35. c.Close()
  36. }()
  37. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  38. for {
  39. _, bytes, err := c.UnderlyingConn.ReadMessage()
  40. if err != nil {
  41. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  42. zlog.Errorf("error: %v", err)
  43. }
  44. // Close connect
  45. _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  46. break
  47. }
  48. message := deserializeMessage(bytes)
  49. switch message.Type {
  50. case MessageTypePingPong:
  51. zlog.Debugf("receive ping message from %s", c.UserId)
  52. _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  53. case MessageTypeUpChating, MessageTypeDownChating:
  54. // Chat dialogue messages
  55. chatingContent := message.Content.(ChatingContent)
  56. // Save message to database
  57. messageId := gid.GetMessageId()
  58. chatingContent.MessageId = messageId
  59. err = c.saveMessage(messageId, message.Type, &chatingContent)
  60. if err != nil {
  61. c.writeError(message.RequestId, err)
  62. continue
  63. }
  64. // Receiver ID format determines whether the receiver is an account or a session
  65. users := c.getReceiverUserIds(chatingContent.Receiver)
  66. for _, id := range users {
  67. var messaging = *message
  68. messaging.Receiver = id
  69. messaging.Content = chatingContent
  70. zlog.Info("Send message to ", id)
  71. // Check if the user is online
  72. if c.firebaseToken != "" {
  73. var online bool
  74. if online, err = c.repos.OnlineRepository.Is(c.ctx, id); err != nil {
  75. zlog.Error(eris.Wrap(err, "unable to find online user"))
  76. continue
  77. }
  78. if online {
  79. c.hub.Message <- &messaging
  80. }
  81. }
  82. }
  83. // Reply message id
  84. message.Content = &ContentReply{MessageId: messageId}
  85. }
  86. // Reply message
  87. if message.IsNeedReply() {
  88. c.Send <- newReplyMessage(message)
  89. // Reset read deadline, prevent Reader from shutting down
  90. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  91. }
  92. }
  93. }
  94. func (c *Client) writer() {
  95. ticker := time.NewTicker(c.pingWait)
  96. defer func() {
  97. ticker.Stop()
  98. c.Close()
  99. }()
  100. for {
  101. select {
  102. case message, ok := <-c.Send:
  103. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  104. if !ok {
  105. // The hub closed the channel.
  106. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  107. return
  108. }
  109. var err error
  110. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  111. if err != nil {
  112. return
  113. }
  114. // Received modification message status
  115. switch message.Type {
  116. case MessageTypeUpChating, MessageTypeDownChating:
  117. // Chat dialogue messages
  118. if chatingContent, ok := message.Content.(ChatingContent); ok {
  119. if msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId); err == nil {
  120. msg.Received = true
  121. c.repos.SessionRepository.UpdateMessage(c.ctx, msg)
  122. } else {
  123. if eris.Is(err, gorm.ErrRecordNotFound) {
  124. break
  125. }
  126. zlog.Error(err)
  127. }
  128. }
  129. }
  130. case <-ticker.C:
  131. // 到时间发送 ping 信号
  132. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  133. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  134. return
  135. }
  136. }
  137. }
  138. }
  139. func (c *Client) writeError(requestId string, err error) {
  140. c.Send <- &Message{
  141. RequestId: requestId,
  142. Type: MessageTypeError,
  143. Content: ContentError{Err: err.Error()},
  144. }
  145. }
  146. func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
  147. payload, _ := json.Marshal(content.Payload)
  148. return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  149. ID: messageId,
  150. SessionId: content.Receiver,
  151. Receiver: content.Receiver,
  152. Sender: c.UserId,
  153. Type: messageType,
  154. ContentType: content.PayloadType,
  155. Content: payload,
  156. IsRead: false,
  157. })
  158. }
  159. // getReceiverUserIds 通过 receiver 获取接收者的用户ID
  160. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  161. // session_id 的话需要查询 session_member 表获取 session 的成员
  162. func (c *Client) getReceiverUserIds(receiver string) []string {
  163. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  164. if reg.Match([]byte(receiver)) {
  165. return []string{receiver}
  166. }
  167. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  168. c.ctx, receiver, c.UserId)
  169. if err != nil {
  170. return []string{}
  171. }
  172. var ms = make([]string, len(members))
  173. for i, memb := range members {
  174. ms[i] = memb.AccountId
  175. }
  176. return ms
  177. }
  178. // Close websocket connection
  179. func (c *Client) Close() {
  180. c.UnderlyingConn.Close()
  181. online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
  182. c.repos.OnlineRepository.Offline(c.ctx, online)
  183. }