client.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package server
  2. import (
  3. "encoding/json"
  4. "regexp"
  5. "time"
  6. "github.com/gin-gonic/gin"
  7. "github.com/google/uuid"
  8. "github.com/gorilla/websocket"
  9. "sikey.com/websocket/models"
  10. "sikey.com/websocket/repositories"
  11. "sikey.com/websocket/utils/zlog"
  12. )
  13. type Client struct {
  14. ctx *gin.Context
  15. UserId string
  16. hub *Hub
  17. UnderlyingConn *websocket.Conn
  18. isRemotely bool // isRemotely 是否是远程连接
  19. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  20. localization string // localization 国际码
  21. // Send message channel 发送消息
  22. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  23. Send chan *Message
  24. readWait time.Duration // readWait 读超时
  25. writeWait time.Duration // writeWait 写超时
  26. pingWait time.Duration // pingWait 心跳超时
  27. repos *repositories.Repositories
  28. }
  29. func (c *Client) reader() {
  30. defer func() {
  31. c.hub.Disconnect <- c
  32. c.Close()
  33. }()
  34. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  35. for {
  36. _, bytes, err := c.UnderlyingConn.ReadMessage()
  37. if err != nil {
  38. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  39. zlog.Errorf("error: %v", err)
  40. }
  41. // Close connect
  42. break
  43. }
  44. message := deserializeMessage(bytes)
  45. switch message.Type {
  46. case MessageTypePingPong:
  47. zlog.Debugf("receive ping message from %s", c.UserId)
  48. case MessageTypeUpChating, MessageTypeDownChating:
  49. // Chat dialogue messages
  50. chatingContent := message.Content.(ChatingContent)
  51. // Save message to database
  52. messageId := uuid.NewString()
  53. chatingContent.MessageId = messageId
  54. err = c.saveMessage(messageId, message.Type, &chatingContent)
  55. if err != nil {
  56. c.writeError(message.RequestId, err)
  57. continue
  58. }
  59. // Receiver ID format determines whether the receiver is an account or a session
  60. users := c.getReceiverUserIds(chatingContent.Receiver)
  61. for _, id := range users {
  62. var messaging = *message
  63. messaging.Receiver = id
  64. messaging.Content = chatingContent
  65. zlog.Info("Send message to ", id)
  66. c.hub.Message <- &messaging
  67. }
  68. // Reply message id
  69. message.Content = &ContentReply{MessageId: messageId}
  70. }
  71. // Reply message
  72. if message.IsNeedReply() {
  73. c.Send <- newReplyMessage(message)
  74. // Reset read deadline, prevent Reader from shutting down
  75. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  76. }
  77. }
  78. }
  79. func (c *Client) writer() {
  80. ticker := time.NewTicker(c.pingWait)
  81. defer func() {
  82. ticker.Stop()
  83. c.Close()
  84. }()
  85. for {
  86. select {
  87. case message, ok := <-c.Send:
  88. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  89. if !ok {
  90. // The hub closed the channel.
  91. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  92. return
  93. }
  94. var err error
  95. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  96. if err != nil {
  97. return
  98. }
  99. case <-ticker.C:
  100. // 到时间发送 ping 信号
  101. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  102. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  103. return
  104. }
  105. }
  106. }
  107. }
  108. func (c *Client) writeError(requestId string, err error) {
  109. c.Send <- &Message{
  110. RequestId: requestId,
  111. Type: MessageTypeError,
  112. Content: ContentError{Err: err.Error()},
  113. }
  114. }
  115. func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
  116. payload, _ := json.Marshal(content.Payload)
  117. return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  118. ID: messageId,
  119. SessionId: content.Receiver,
  120. Receiver: content.Receiver,
  121. Sender: c.UserId,
  122. Type: messageType,
  123. ContentType: content.PayloadType,
  124. Content: string(payload),
  125. IsRead: false,
  126. })
  127. }
  128. // getReceiverUserIds 通过 receiver 获取接收者的用户ID
  129. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  130. // session_id 的话需要查询 session_member 表获取 session 的成员
  131. func (c *Client) getReceiverUserIds(receiver string) []string {
  132. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  133. if reg.Match([]byte(receiver)) {
  134. return []string{receiver}
  135. }
  136. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  137. c.ctx, receiver, c.UserId)
  138. if err != nil {
  139. return []string{}
  140. }
  141. var ms = make([]string, len(members))
  142. for i, memb := range members {
  143. ms[i] = memb.AccountId
  144. }
  145. return ms
  146. }
  147. // Close websocket connection
  148. func (c *Client) Close() {
  149. c.UnderlyingConn.Close()
  150. online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
  151. c.repos.OnlineRepository.Offline(c.ctx, online)
  152. }