client.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package server
  2. import (
  3. "encoding/json"
  4. "time"
  5. "github.com/gin-gonic/gin"
  6. "github.com/gorilla/websocket"
  7. "sikey.com/websocket/models"
  8. "sikey.com/websocket/repositories"
  9. "sikey.com/websocket/utils/zlog"
  10. )
  11. type Client struct {
  12. ctx *gin.Context
  13. UserId string
  14. hub *Hub
  15. UnderlyingConn *websocket.Conn
  16. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  17. localization string // localization 国际码
  18. // Send message channel 发送消息
  19. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  20. // 则会通过 SendOffline channel 发送离线消息
  21. Send chan *Message
  22. SendOffline chan *Message // SendOffline 发送离线消息
  23. readWait time.Duration // readWait 读超时
  24. writeWait time.Duration // writeWait 写超时
  25. pingWait time.Duration
  26. pongWait time.Duration
  27. repos *repositories.Repositories
  28. }
  29. func (c *Client) reader() {
  30. defer func() {
  31. c.hub.Disconnect <- c
  32. c.UnderlyingConn.Close()
  33. }()
  34. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  35. for {
  36. _, bytes, err := c.UnderlyingConn.ReadMessage()
  37. if err != nil {
  38. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  39. zlog.Errorf("error: %v", err)
  40. }
  41. break
  42. }
  43. message := deserializeMessage(bytes)
  44. switch message.Type {
  45. case MessageTypePing:
  46. // Reset read dead line, prevent Reader from shutting down
  47. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pongWait))
  48. // Reply pong message
  49. c.Send <- pongMessage()
  50. case MessageTypeChating:
  51. // Chat dialogue messages
  52. chatingContent := message.Content.(ChatingContent)
  53. // Save message to database
  54. payload, _ := json.Marshal(chatingContent.Payload)
  55. err := c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  56. ID: message.MessageId,
  57. SessionId: chatingContent.Receiver,
  58. Receiver: chatingContent.Receiver,
  59. Sender: c.UserId,
  60. Type: uint8(message.Type),
  61. ContentType: uint8(chatingContent.PayloadType),
  62. Content: string(payload),
  63. IsRead: false,
  64. })
  65. if err != nil {
  66. c.writeError(message.MessageId, err)
  67. continue
  68. }
  69. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  70. c.ctx, chatingContent.Receiver, c.UserId)
  71. if err != nil {
  72. c.writeError(message.MessageId, err)
  73. continue
  74. }
  75. for _, member := range members {
  76. var messaging = *message
  77. messaging.receiver = member.AccountId
  78. messaging.Content = chatingContent
  79. c.hub.Message <- &messaging
  80. }
  81. }
  82. // Reply message
  83. if message.IsNeedReply() {
  84. c.Send <- newReplyMessage(message.MessageId)
  85. }
  86. }
  87. }
  88. func (c *Client) writer() {
  89. ticker := time.NewTicker(c.pingWait)
  90. defer func() {
  91. ticker.Stop()
  92. c.UnderlyingConn.Close()
  93. }()
  94. for {
  95. select {
  96. case message, ok := <-c.Send:
  97. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  98. if !ok {
  99. // The hub closed the channel.
  100. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  101. return
  102. }
  103. var err error
  104. if message.Type == MessageTypePong {
  105. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, []byte("pong"))
  106. if err != nil {
  107. return
  108. }
  109. break
  110. }
  111. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  112. if err != nil {
  113. return
  114. }
  115. case <-ticker.C:
  116. // 到时间发送 ping 信号
  117. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  118. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  119. return
  120. }
  121. }
  122. }
  123. }
  124. func (c *Client) writeError(messageId string, err error) {
  125. c.Send <- &Message{
  126. MessageId: messageId,
  127. Type: MessageTypeError,
  128. Content: ContentError{Err: err.Error()},
  129. }
  130. }