client.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package server
  2. import (
  3. "encoding/json"
  4. "time"
  5. "github.com/gin-gonic/gin"
  6. "github.com/google/uuid"
  7. "github.com/gorilla/websocket"
  8. "sikey.com/websocket/models"
  9. "sikey.com/websocket/repositories"
  10. "sikey.com/websocket/utils/zlog"
  11. )
  12. type Client struct {
  13. ctx *gin.Context
  14. UserId string
  15. hub *Hub
  16. UnderlyingConn *websocket.Conn
  17. isRemotely bool // isRemotely 是否是远程连接
  18. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  19. localization string // localization 国际码
  20. // Send message channel 发送消息
  21. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  22. // 则会通过 SendOffline channel 发送离线消息
  23. Send chan *Message
  24. RemotelyMessage chan *Message // RemotelyMessage 远程消息
  25. // SendOffline chan *Message // SendOffline 发送离线消息
  26. readWait time.Duration // readWait 读超时
  27. writeWait time.Duration // writeWait 写超时
  28. pingWait time.Duration // pingWait 心跳超时
  29. repos *repositories.Repositories
  30. }
  31. func (c *Client) reader() {
  32. defer func() {
  33. c.hub.Disconnect <- c
  34. c.UnderlyingConn.Close()
  35. }()
  36. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  37. for {
  38. _, bytes, err := c.UnderlyingConn.ReadMessage()
  39. if err != nil {
  40. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  41. zlog.Errorf("error: %v", err)
  42. // if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
  43. // zlog.Error(err)
  44. // }
  45. }
  46. break
  47. }
  48. message := deserializeMessage(bytes)
  49. switch message.Type {
  50. case MessageTypeUpChating, MessageTypeDownChating:
  51. // Chat dialogue messages
  52. chatingContent := message.Content.(ChatingContent)
  53. // Save message to database
  54. payload, _ := json.Marshal(chatingContent.Payload)
  55. messageId := uuid.NewString()
  56. err := c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  57. ID: messageId,
  58. SessionId: chatingContent.Receiver,
  59. Receiver: chatingContent.Receiver,
  60. Sender: c.UserId,
  61. Type: uint8(message.Type),
  62. ContentType: uint8(chatingContent.PayloadType),
  63. Content: string(payload),
  64. IsRead: false,
  65. })
  66. if err != nil {
  67. c.writeError(message.RequestId, err)
  68. continue
  69. }
  70. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  71. c.ctx, chatingContent.Receiver, c.UserId)
  72. if err != nil {
  73. c.writeError(message.RequestId, err)
  74. continue
  75. }
  76. chatingContent.MessageId = messageId
  77. for _, member := range members {
  78. var messaging = *message
  79. messaging.receiver = member.AccountId
  80. messaging.Content = chatingContent
  81. c.hub.Message <- &messaging
  82. zlog.Info("Send message to ", member.AccountId)
  83. }
  84. // Reply message id
  85. message.Content = &ContentReply{MessageId: messageId}
  86. }
  87. // Reply message
  88. if message.IsNeedReply() {
  89. c.Send <- newReplyMessage(message)
  90. // Reset read dead line, prevent Reader from shutting down
  91. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  92. }
  93. }
  94. }
  95. func (c *Client) writer() {
  96. ticker := time.NewTicker(c.pingWait)
  97. defer func() {
  98. ticker.Stop()
  99. c.UnderlyingConn.Close()
  100. // if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
  101. // zlog.Error(err)
  102. // }
  103. }()
  104. for {
  105. select {
  106. case message, ok := <-c.Send:
  107. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  108. if !ok {
  109. // The hub closed the channel.
  110. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  111. return
  112. }
  113. var err error
  114. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  115. if err != nil {
  116. return
  117. }
  118. // case message, ok := <-c.RemotelyMessage:
  119. // // Sending remotely message to client
  120. // if !ok {
  121. // }
  122. // err := c.hub.exchange.SendMessage(c.ctx, kafka.Message{
  123. // Key: []byte(stackexchange.StackExchangeMessaging),
  124. // Value: serializationMessage(message),
  125. // })
  126. // if err != nil {
  127. // return
  128. // }
  129. case <-ticker.C:
  130. // 到时间发送 ping 信号
  131. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  132. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  133. return
  134. }
  135. }
  136. }
  137. }
  138. func (c *Client) writeError(requestId string, err error) {
  139. c.Send <- &Message{
  140. RequestId: requestId,
  141. Type: MessageTypeError,
  142. Content: ContentError{Err: err.Error()},
  143. }
  144. }