client.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "regexp"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gorilla/websocket"
  9. "github.com/mitchellh/mapstructure"
  10. "github.com/rotisserie/eris"
  11. "go.uber.org/zap"
  12. "sikey.com/websocket/models"
  13. "sikey.com/websocket/repositories"
  14. "x.sikey.com.cn/serverx/gid"
  15. )
  16. type Client struct {
  17. ctx *gin.Context
  18. UserId string
  19. hub *Hub
  20. UnderlyingConn *websocket.Conn
  21. online *models.Online
  22. // logger *zlog.Logger
  23. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  24. localization string // localization 国际码
  25. firebaseToken string // firebaseToken FCM 推送的 token
  26. // Send message channel 发送消息
  27. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  28. Send chan *Message
  29. ReplySend chan *Message // ReplySend 回复消息专用 Channel
  30. readWait time.Duration // readWait 读超时
  31. writeWait time.Duration // writeWait 写超时
  32. pingpongWait time.Duration // pingWait 心跳超时
  33. repos *repositories.Repositories
  34. }
  35. func (c *Client) withRequestIdContext(ctx context.Context, requestId string) context.Context {
  36. return context.WithValue(ctx, "request_id", requestId)
  37. }
  38. func (c *Client) Reader() {
  39. defer func() {
  40. zap.L().Info("[conn] client offline", zap.String("user_id", c.UserId))
  41. c.hub.Disconnect <- c
  42. c.Close()
  43. }()
  44. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  45. for {
  46. _, bytes, err := c.UnderlyingConn.ReadMessage()
  47. if err != nil {
  48. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  49. zap.L().Error("[conn] normal disconnection",
  50. zap.String("user_id", c.UserId),
  51. zap.Error(err))
  52. return
  53. }
  54. // Close connect
  55. _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  56. zap.L().Error("[conn] read message error",
  57. zap.Error(err),
  58. zap.String("user_id", c.UserId))
  59. return
  60. }
  61. // To processing take over message
  62. c.Processing(bytes)
  63. }
  64. }
  65. // writer is event listening for servers sending messages to clients
  66. func (c *Client) Writer() {
  67. ticker := time.NewTicker(c.pingpongWait)
  68. defer func() {
  69. ticker.Stop()
  70. c.Close()
  71. }()
  72. for {
  73. select {
  74. case message, ok := <-c.ReplySend:
  75. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  76. if !ok {
  77. // The hub closed the channel.
  78. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  79. return
  80. }
  81. c.WriteMessage(message)
  82. case message, ok := <-c.Send:
  83. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  84. if !ok {
  85. // The hub closed the channel.
  86. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  87. return
  88. }
  89. // Received modification message status
  90. switch message.Type {
  91. case MessageTypeUpChating:
  92. // Chat dialogue messages
  93. if chatingContent, ok := message.Content.(ChatingContent); ok {
  94. msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId)
  95. if err == nil {
  96. msg.Received = true
  97. if err := c.repos.SessionRepository.UpdateMessage(c.ctx, msg); err != nil {
  98. zap.L().Error("[client] [writer] unable to update message status received",
  99. zap.Error(err),
  100. zap.String("message_id", message.MessageId),
  101. zap.String("request_id", message.RequestId))
  102. }
  103. } else {
  104. if eris.Is(err, models.ErrRecordNotFound) {
  105. break
  106. }
  107. zap.L().Error("[client] [writer] unable to find message", zap.Error(err),
  108. zap.String("message_id", message.MessageId),
  109. zap.String("request_id", message.RequestId))
  110. }
  111. }
  112. message.Type = MessageTypeDownChating
  113. case MessageTypeError:
  114. }
  115. c.WriteMessage(message)
  116. case <-ticker.C:
  117. // 到时间发送 ping 信号
  118. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  119. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  120. return
  121. }
  122. }
  123. }
  124. }
  125. func (c *Client) WriteMessage(message *Message) {
  126. if c.UnderlyingConn == nil {
  127. return
  128. }
  129. err := c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  130. if err != nil {
  131. zap.L().Error("[client] [writer] unable to write message",
  132. zap.Error(err),
  133. zap.String("message_id", message.MessageId),
  134. zap.String("request_id", message.RequestId))
  135. return
  136. }
  137. }
  138. // Processing message
  139. // the server is ready to send a message to the client,
  140. // or when the server receives a message from the client, it processes the message
  141. func (c *Client) Processing(bytes []byte) {
  142. message := deserializeMessage(bytes)
  143. ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
  144. message.sender = c.UserId
  145. // Persistent messages, save message
  146. if err := c.persistenceMessage(ctx, message); err != nil {
  147. c.writeError(ctx, err)
  148. return
  149. }
  150. switch message.Type {
  151. case MessageTypePingPong:
  152. c.procHeartbeat(ctx) // Heartbeat message
  153. case MessageTypeUpChating:
  154. // Chat dialogue messages
  155. if chatingContent, ok := message.Content.(ChatingContent); ok {
  156. chatingContent.MessageId = message.MessageId
  157. c.procChating(ctx, message.Type, chatingContent)
  158. }
  159. case MessageTypeNotification:
  160. // Notification message
  161. if notificationContent, ok := message.Content.(NotificationContent); ok {
  162. c.procNotify(ctx, notificationContent)
  163. }
  164. case MessageTypeError:
  165. c.hub.Message <- message
  166. }
  167. // Reply message
  168. if message.IsNeedReply() && !message.IsError() {
  169. c.ReplySend <- newReplyMessage(message)
  170. }
  171. // Reset read deadline, prevent Reader from shutting down
  172. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingpongWait))
  173. }
  174. // persistenceMessage persistent message
  175. func (c *Client) persistenceMessage(ctx context.Context, message *Message) error {
  176. requestId := ctx.Value("request_id").(string)
  177. (*message).MessageId = gid.GetSnowflakeId()
  178. // Create message model
  179. mod := models.SessionMessage{
  180. ID: message.MessageId,
  181. Receiver: message.Receiver,
  182. Sender: message.sender,
  183. Type: message.Type,
  184. ContentType: 1,
  185. SentAt: time.Now().UTC(),
  186. IsRead: false,
  187. Received: false,
  188. }
  189. zap.L().Info("[persistence] save database begin",
  190. zap.Int8("type", message.Type),
  191. zap.Any("content", message.Content),
  192. zap.String("request_id", requestId))
  193. switch message.Type {
  194. case MessageTypePingPong, MessageTypeEmpty, MessageTypeError:
  195. // Heartbeat message, empty message, error message
  196. zap.L().Info("[persistence] not save end",
  197. zap.Int8("type", message.Type),
  198. zap.Any("content", message.Content),
  199. zap.String("request_id", requestId))
  200. return nil
  201. case MessageTypeDownChating, MessageTypeUpChating:
  202. // Chat dialogue messages
  203. chatingContent := message.Content.(ChatingContent)
  204. chatingContent.MessageId = message.MessageId // Important, not having this line may result in the inability to receive messages
  205. mod.ContentType = uint8(chatingContent.PayloadType) // Important, not having this line may result in the inability to receive messages
  206. mod.Receiver = chatingContent.Receiver
  207. // Format the structure. Sometimes,
  208. // the data passed in may contain fields that contaminate the structure.
  209. // This step ensures the consistency of the structure
  210. var err error
  211. switch chatingContent.PayloadType {
  212. case ChatingContentTypeText:
  213. // Text payload
  214. var textContent ContentText
  215. err = mapstructure.Decode(chatingContent.Payload, &textContent)
  216. chatingContent.Payload = textContent
  217. case ChatingContentTypeMetadata:
  218. // Metadata payload
  219. var contentMetadata ContentMetadata
  220. err = mapstructure.Decode(chatingContent.Payload, &contentMetadata)
  221. chatingContent.Payload = contentMetadata
  222. }
  223. if err != nil {
  224. return eris.Wrap(err, "unable to decode message content")
  225. }
  226. payload, _ := json.Marshal(chatingContent.Payload)
  227. mod.Content = payload
  228. mod.SentAt = time.UnixMilli(chatingContent.SendTime)
  229. message.Content = chatingContent
  230. case MessageTypeNotification:
  231. // Notification message
  232. notificationContent := message.Content.(NotificationContent)
  233. mod.ContentType = uint8(notificationContent.ID)
  234. // Notification payload
  235. if len(notificationContent.Payload) > 0 {
  236. payload, _ := json.Marshal(notificationContent.Payload)
  237. mod.Content = payload
  238. } else {
  239. mod.Content = []byte("{}")
  240. }
  241. case MessageTypeLocation:
  242. // Location message
  243. locationMessageContent := message.Content.(LocationMessageContent)
  244. // Marshaler payload
  245. payload, err := json.Marshal(locationMessageContent)
  246. if err != nil {
  247. zap.L().Error("marshal error", zap.Error(err), zap.String("request_id", requestId))
  248. }
  249. mod.Content = payload
  250. case MessageTypeVideoCall:
  251. // Video call message
  252. videoCallMessageContent := message.Content.(VideoCallMessageContent)
  253. payload, _ := json.Marshal(videoCallMessageContent)
  254. mod.Content = payload
  255. default:
  256. return eris.New("unknown message type")
  257. }
  258. err := c.repos.SessionRepository.CreateMessage(c.ctx, &mod)
  259. if err != nil {
  260. zap.L().Error("[persistence] unable to message",
  261. zap.Error(err),
  262. zap.String("user_id", c.UserId),
  263. zap.String("request_id", requestId))
  264. return eris.Wrap(err, "unable to persistence message")
  265. }
  266. zap.L().Info("[persistence] save database end",
  267. zap.String("request_id", requestId),
  268. zap.String("message_id", message.MessageId),
  269. zap.String("user_id", c.UserId),
  270. zap.String("sender", mod.Sender))
  271. return nil
  272. }
  273. // procNotify received message processing
  274. func (c *Client) procNotify(ctx context.Context, notificationContent NotificationContent) {
  275. requestId := ctx.Value("request_id").(string)
  276. zap.L().Info("[client] [reader] received notify message",
  277. zap.Int("notify_id", notificationContent.ID),
  278. zap.String("request_id", requestId),
  279. zap.String("user_id", c.UserId))
  280. // Ask for location
  281. messaging := Message{
  282. sender: c.UserId,
  283. Type: MessageTypeNotification,
  284. RequestId: requestId,
  285. }
  286. switch notificationContent.ID {
  287. case NotificationTypeAskLocation:
  288. messaging.Content = NotificationContent{ID: NotificationTypeAskLocation}
  289. c.hub.Message <- &messaging
  290. }
  291. }
  292. // procChating received message processing
  293. func (c *Client) procChating(ctx context.Context, msgType MessageType, chatingContent ChatingContent) {
  294. requestId := ctx.Value("request_id").(string)
  295. chatingContent.SessionId = chatingContent.Receiver
  296. // Receiver ID format determines whether the receiver is an account or a session
  297. users := c.regexpReceiveUserIds(chatingContent.Receiver)
  298. zap.L().Info("[client] [reader] received chating message",
  299. zap.String("receiver", chatingContent.Receiver),
  300. zap.String("request_id", requestId),
  301. zap.String("user_id", c.UserId),
  302. zap.Strings("users", users))
  303. for _, id := range users {
  304. messaging := Message{
  305. sender: c.UserId,
  306. Type: msgType,
  307. RequestId: requestId,
  308. Receiver: id,
  309. Content: chatingContent,
  310. }
  311. c.hub.Message <- &messaging
  312. }
  313. }
  314. // procHeartbeat message
  315. func (c *Client) procHeartbeat(ctx context.Context) {
  316. zap.L().Info("[client] [reader] receive ping message",
  317. zap.String("user_id", c.UserId),
  318. zap.String("request_id", ctx.Value("request_id").(string)))
  319. _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  320. }
  321. func (c *Client) writeError(ctx context.Context, err error) {
  322. c.Send <- &Message{
  323. RequestId: ctx.Value("request_id").(string),
  324. Type: MessageTypeError,
  325. Content: ContentError{Err: err.Error()},
  326. }
  327. }
  328. // regexpReceiveUserIds 通过 receiver 获取接收者的用户ID
  329. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  330. // session_id 的话需要查询 session_member 表获取 session 的成员
  331. func (c *Client) regexpReceiveUserIds(receiver string) []string {
  332. if receiver == "" {
  333. return []string{}
  334. }
  335. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  336. if reg.Match([]byte(receiver)) {
  337. return []string{receiver}
  338. }
  339. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  340. c.ctx, receiver, c.UserId)
  341. if err != nil {
  342. zap.L().Error("unable to get session members", zap.Error(err))
  343. return []string{}
  344. }
  345. var ms = make([]string, len(members))
  346. for i, memb := range members {
  347. ms[i] = memb.RefId
  348. }
  349. return ms
  350. }
  351. // Close websocket connection
  352. func (c *Client) Close() {
  353. c.UnderlyingConn.Close()
  354. c.repos.OnlineRepository.Offline(c.ctx, c.online)
  355. }