client.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "regexp"
  6. "time"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gorilla/websocket"
  9. "github.com/mitchellh/mapstructure"
  10. "github.com/rotisserie/eris"
  11. "go.uber.org/zap"
  12. "sikey.com/websocket/models"
  13. "sikey.com/websocket/pkg/gid"
  14. "sikey.com/websocket/repositories"
  15. )
  16. type Client struct {
  17. ctx *gin.Context
  18. UserId string
  19. hub *Hub
  20. UnderlyingConn *websocket.Conn
  21. online *models.Online
  22. // logger *zlog.Logger
  23. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  24. localization string // localization 国际码
  25. firebaseToken string // firebaseToken FCM 推送的 token
  26. // Send message channel 发送消息
  27. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  28. Send chan *Message
  29. ReplySend chan *Message // ReplySend 回复消息专用 Channel
  30. readWait time.Duration // readWait 读超时
  31. writeWait time.Duration // writeWait 写超时
  32. pingpongWait time.Duration // pingWait 心跳超时
  33. repos *repositories.Repositories
  34. }
  35. func (c *Client) withRequestIdContext(ctx context.Context, requestId string) context.Context {
  36. return context.WithValue(ctx, "request_id", requestId)
  37. }
  38. func (c *Client) Reader() {
  39. defer func() {
  40. zap.L().Info("[conn] client offline", zap.String("user_id", c.UserId))
  41. c.hub.Disconnect <- c
  42. c.Close()
  43. }()
  44. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  45. for {
  46. _, bytes, err := c.UnderlyingConn.ReadMessage()
  47. if err != nil {
  48. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  49. zap.L().Error("[conn] normal disconnection",
  50. zap.String("user_id", c.UserId),
  51. zap.Error(err))
  52. return
  53. }
  54. // Close connect
  55. _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  56. zap.L().Error("[conn] read message error",
  57. zap.Error(err),
  58. zap.String("user_id", c.UserId))
  59. return
  60. }
  61. // To processing take over message
  62. c.Processing(bytes)
  63. }
  64. }
  65. // Writer is event listening for servers sending messages to clients
  66. func (c *Client) Writer() {
  67. ticker := time.NewTicker(c.pingpongWait)
  68. defer func() {
  69. ticker.Stop()
  70. c.Close()
  71. }()
  72. for {
  73. select {
  74. case message, ok := <-c.ReplySend:
  75. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  76. if !ok {
  77. // The hub closed the channel.
  78. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  79. return
  80. }
  81. c.WriteMessage(message)
  82. case message, ok := <-c.Send:
  83. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  84. if !ok {
  85. // The hub closed the channel.
  86. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  87. return
  88. }
  89. // Received modification message status
  90. switch message.Type {
  91. case MessageTypeUpChating:
  92. // Chat dialogue messages
  93. // if chatingContent, ok := message.Content.(ChatingContent); ok {
  94. // msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId)
  95. // if err == nil {
  96. // // msg.Received = true
  97. // // if err := c.repos.SessionRepository.UpdateMessage(c.ctx, msg); err != nil {
  98. // // zap.L().Error("[client] [writer] unable to update message status received",
  99. // // zap.Error(err),
  100. // // zap.String("message_id", message.MessageId),
  101. // // zap.String("request_id", message.RequestId))
  102. // // }
  103. // } else {
  104. // if eris.Is(err, models.ErrRecordNotFound) {
  105. // break
  106. // }
  107. // zap.L().Error("[client] [writer] unable to find message", zap.Error(err),
  108. // zap.String("message_id", message.MessageId),
  109. // zap.String("request_id", message.RequestId))
  110. // }
  111. // // Add Unread message count
  112. // _, err = c.repos.UnreadCounterRepository.AddUnreadCount(c.ctx, message.Receiver, chatingContent.SessionId, 1)
  113. // if err != nil {
  114. // zap.L().Error("[client] [writer] unable to add unread count",
  115. // zap.Error(err),
  116. // zap.String("message_id", message.MessageId),
  117. // zap.String("request_id", message.RequestId))
  118. // return
  119. // }
  120. // }
  121. message.Type = MessageTypeDownChating
  122. case MessageTypeError:
  123. }
  124. c.WriteMessage(message)
  125. case <-ticker.C:
  126. // 到时间发送 ping 信号
  127. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  128. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  129. return
  130. }
  131. }
  132. }
  133. }
  134. func (c *Client) WriteMessage(message *Message) {
  135. if c.UnderlyingConn == nil {
  136. return
  137. }
  138. err := c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  139. if err != nil {
  140. zap.L().Error("[client] [writer] unable to write message",
  141. zap.Error(err),
  142. zap.String("message_id", message.MessageId),
  143. zap.String("request_id", message.RequestId))
  144. return
  145. }
  146. }
  147. // Processing message
  148. // the server is ready to send a message to the client,
  149. // or when the server receives a message from the client, it processes the message
  150. func (c *Client) Processing(bytes []byte) {
  151. message := deserializeMessage(bytes)
  152. ctx := c.withRequestIdContext(c.ctx.Copy(), message.RequestId)
  153. message.sender = c.UserId
  154. // Persistent messages, save message
  155. if message.Type == MessageTypeUpChating {
  156. if err := c.persistenceMessage(ctx, message); err != nil {
  157. c.writeError(ctx, err)
  158. return
  159. }
  160. }
  161. switch message.Type {
  162. case MessageTypePingPong:
  163. c.procHeartbeat(ctx) // Heartbeat message
  164. case MessageTypeUpChating:
  165. // Chat dialogue messages
  166. if chatingContent, ok := message.Content.(ChatingContent); ok {
  167. chatingContent.MessageId = message.MessageId
  168. c.procChating(ctx, message.Type, chatingContent)
  169. }
  170. case MessageTypeNotification:
  171. // Notification message
  172. if notificationContent, ok := message.Content.(NotificationContent); ok {
  173. c.procNotify(ctx, notificationContent)
  174. }
  175. case MessageTypeError:
  176. c.hub.Message <- message
  177. }
  178. // Reply message
  179. if message.IsNeedReply() && !message.IsError() {
  180. c.ReplySend <- newReplyMessage(message)
  181. }
  182. // Reset read deadline, prevent Reader from shutting down
  183. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingpongWait))
  184. }
  185. // persistenceMessage persistent message
  186. func (c *Client) persistenceMessage(ctx context.Context, message *Message) error {
  187. requestId := ctx.Value("request_id").(string)
  188. (*message).MessageId = gid.GetSnowflakeId()
  189. // Create message model
  190. mod := models.SessionMessage{
  191. ID: message.MessageId,
  192. Receiver: message.Receiver,
  193. Sender: message.sender,
  194. PayloadType: 1,
  195. SentAt: time.Now().UTC(),
  196. }
  197. zap.L().Info("[persistence] save database begin",
  198. zap.Int8("type", message.Type),
  199. zap.Any("content", message.Content),
  200. zap.String("request_id", requestId))
  201. switch message.Type {
  202. case MessageTypePingPong, MessageTypeEmpty, MessageTypeError:
  203. // Heartbeat message, empty message, error message
  204. zap.L().Info("[persistence] not save end",
  205. zap.Int8("type", message.Type),
  206. zap.Any("content", message.Content),
  207. zap.String("request_id", requestId))
  208. return nil
  209. case MessageTypeUpChating:
  210. // Chat dialogue messages
  211. chatingContent := message.Content.(ChatingContent)
  212. chatingContent.MessageId = message.MessageId // Important, not having this line may result in the inability to receive messages
  213. mod.PayloadType = chatingContent.PayloadType // Important, not having this line may result in the inability to receive messages
  214. mod.Receiver = chatingContent.Receiver
  215. mod.SessionId = chatingContent.SessionId
  216. // Format the structure. Sometimes,
  217. // the data passed in may contain fields that contaminate the structure.
  218. // This step ensures the consistency of the structure
  219. var err error
  220. switch chatingContent.PayloadType {
  221. case ChatingContentTypeText:
  222. // Text payload
  223. var textContent ContentText
  224. err = mapstructure.Decode(chatingContent.Payload, &textContent)
  225. chatingContent.Payload = textContent
  226. case ChatingContentTypeMetadata:
  227. // Metadata payload
  228. var contentMetadata ContentMetadata
  229. err = mapstructure.Decode(chatingContent.Payload, &contentMetadata)
  230. chatingContent.Payload = contentMetadata
  231. }
  232. if err != nil {
  233. return eris.Wrap(err, "unable to decode message content")
  234. }
  235. payload, _ := json.Marshal(chatingContent.Payload)
  236. mod.Payload = payload
  237. mod.SentAt = time.UnixMilli(chatingContent.SendTime)
  238. message.Content = chatingContent
  239. // case MessageTypeNotification:
  240. // // Notification message
  241. // notificationContent := message.Content.(NotificationContent)
  242. // mod.ContentType = uint8(notificationContent.ID)
  243. // // Notification payload
  244. // if len(notificationContent.Payload) > 0 {
  245. // payload, _ := json.Marshal(notificationContent.Payload)
  246. // mod.Content = payload
  247. // } else {
  248. // mod.Content = []byte("{}")
  249. // }
  250. // case MessageTypeLocation:
  251. // // Location message
  252. // locationMessageContent := message.Content.(LocationMessageContent)
  253. // // Marshaler payload
  254. // payload, err := json.Marshal(locationMessageContent)
  255. // if err != nil {
  256. // zap.L().Error("marshal error", zap.Error(err), zap.String("request_id", requestId))
  257. // }
  258. // mod.Content = payload
  259. // case MessageTypeVideoCall:
  260. // Video call message
  261. // videoCallMessageContent := message.Content.(VideoCallMessageContent)
  262. // payload, _ := json.Marshal(videoCallMessageContent)
  263. // mod.Content = payload
  264. default:
  265. return eris.New("unknown message type")
  266. }
  267. err := c.repos.Transaction(ctx, func(ctx context.Context, repos *repositories.Repositories) error {
  268. err := c.repos.SessionRepository.CreateMessage(c.ctx, &mod)
  269. if err != nil {
  270. zap.L().Error("[persistence] unable to message",
  271. zap.Error(err),
  272. zap.String("user_id", c.UserId),
  273. zap.String("request_id", requestId))
  274. return eris.Wrap(err, "unable to persistence message")
  275. }
  276. // 将自己设置为已读
  277. err = c.repos.MessageReadLogRepository.CreateMessageReadLog(ctx, &models.MessageReadLog{
  278. MessageId: mod.ID,
  279. Reader: mod.Sender,
  280. })
  281. return err
  282. })
  283. if err != nil {
  284. return err
  285. }
  286. zap.L().Info("[persistence] save database end",
  287. zap.String("request_id", requestId),
  288. zap.String("message_id", message.MessageId),
  289. zap.String("user_id", c.UserId),
  290. zap.String("sender", mod.Sender))
  291. return nil
  292. }
  293. // procNotify received message processing
  294. func (c *Client) procNotify(ctx context.Context, notificationContent NotificationContent) {
  295. requestId := ctx.Value("request_id").(string)
  296. zap.L().Info("[client] [reader] received notify message",
  297. zap.Int("notify_id", notificationContent.ID),
  298. zap.String("request_id", requestId),
  299. zap.String("user_id", c.UserId))
  300. // Ask for location
  301. messaging := Message{
  302. sender: c.UserId,
  303. Type: MessageTypeNotification,
  304. RequestId: requestId,
  305. }
  306. switch notificationContent.ID {
  307. case NotificationTypeAskLocation:
  308. messaging.Content = NotificationContent{ID: NotificationTypeAskLocation}
  309. c.hub.Message <- &messaging
  310. }
  311. }
  312. // procChating received message processing
  313. func (c *Client) procChating(ctx context.Context, msgType MessageType, chatingContent ChatingContent) {
  314. requestId := ctx.Value("request_id").(string)
  315. chatingContent.SessionId = chatingContent.Receiver
  316. // Receiver ID format determines whether the receiver is an account or a session
  317. users := c.regexpReceiveUserIds(chatingContent.Receiver)
  318. zap.L().Info("[client] [reader] received chating message",
  319. zap.String("receiver", chatingContent.Receiver),
  320. zap.String("request_id", requestId),
  321. zap.String("user_id", c.UserId),
  322. zap.Strings("users", users))
  323. for _, id := range users {
  324. messaging := Message{
  325. sender: c.UserId,
  326. Type: msgType,
  327. RequestId: requestId,
  328. Receiver: id,
  329. Content: chatingContent,
  330. }
  331. c.hub.Message <- &messaging
  332. }
  333. }
  334. // procHeartbeat message
  335. func (c *Client) procHeartbeat(ctx context.Context) {
  336. zap.L().Info("[client] [reader] receive ping message",
  337. zap.String("user_id", c.UserId),
  338. zap.String("request_id", ctx.Value("request_id").(string)))
  339. _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  340. }
  341. func (c *Client) writeError(ctx context.Context, err error) {
  342. c.Send <- &Message{
  343. RequestId: ctx.Value("request_id").(string),
  344. Type: MessageTypeError,
  345. Content: ContentError{Err: err.Error()},
  346. }
  347. }
  348. // regexpReceiveUserIds 通过 receiver 获取接收者的用户ID
  349. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  350. // session_id 的话需要查询 session_member 表获取 session 的成员
  351. func (c *Client) regexpReceiveUserIds(receiver string) []string {
  352. if receiver == "" {
  353. return []string{}
  354. }
  355. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  356. if reg.Match([]byte(receiver)) {
  357. return []string{receiver}
  358. }
  359. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  360. c.ctx, receiver, c.UserId)
  361. if err != nil {
  362. zap.L().Error("unable to get session members", zap.Error(err))
  363. return []string{}
  364. }
  365. var ms = make([]string, len(members))
  366. for i, memb := range members {
  367. ms[i] = memb.RefId
  368. }
  369. return ms
  370. }
  371. // Close websocket connection
  372. func (c *Client) Close() {
  373. c.UnderlyingConn.Close()
  374. c.repos.OnlineRepository.Offline(c.ctx, c.online)
  375. }