client.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package server
  2. import (
  3. "encoding/json"
  4. "time"
  5. "github.com/gin-gonic/gin"
  6. "github.com/google/uuid"
  7. "github.com/gorilla/websocket"
  8. "sikey.com/websocket/models"
  9. "sikey.com/websocket/repositories"
  10. "sikey.com/websocket/utils/zlog"
  11. )
  12. type Client struct {
  13. ctx *gin.Context
  14. UserId string
  15. hub *Hub
  16. UnderlyingConn *websocket.Conn
  17. isRemotely bool // isRemotely 是否是远程连接
  18. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  19. localization string // localization 国际码
  20. // Send message channel 发送消息
  21. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  22. Send chan *Message
  23. readWait time.Duration // readWait 读超时
  24. writeWait time.Duration // writeWait 写超时
  25. pingWait time.Duration // pingWait 心跳超时
  26. repos *repositories.Repositories
  27. }
  28. func (c *Client) reader() {
  29. defer func() {
  30. c.hub.Disconnect <- c
  31. c.UnderlyingConn.Close()
  32. }()
  33. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
  34. for {
  35. _, bytes, err := c.UnderlyingConn.ReadMessage()
  36. if err != nil {
  37. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  38. zlog.Errorf("error: %v", err)
  39. }
  40. // Close connect
  41. break
  42. }
  43. message := deserializeMessage(bytes)
  44. switch message.Type {
  45. case MessageTypeUpChating, MessageTypeDownChating:
  46. // Chat dialogue messages
  47. chatingContent := message.Content.(ChatingContent)
  48. // Save message to database
  49. payload, _ := json.Marshal(chatingContent.Payload)
  50. messageId := uuid.NewString()
  51. err = c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
  52. ID: messageId,
  53. SessionId: chatingContent.Receiver,
  54. Receiver: chatingContent.Receiver,
  55. Sender: c.UserId,
  56. Type: message.Type,
  57. ContentType: chatingContent.PayloadType,
  58. Content: string(payload),
  59. IsRead: false,
  60. })
  61. if err != nil {
  62. c.writeError(message.RequestId, err)
  63. continue
  64. }
  65. members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
  66. c.ctx, chatingContent.Receiver, c.UserId)
  67. if err != nil {
  68. c.writeError(message.RequestId, err)
  69. continue
  70. }
  71. chatingContent.MessageId = messageId
  72. for _, member := range members {
  73. var messaging = *message
  74. messaging.Receiver = member.AccountId
  75. messaging.Content = chatingContent
  76. c.hub.Message <- &messaging
  77. zlog.Info("Send message to ", member.AccountId)
  78. }
  79. // Reply message id
  80. message.Content = &ContentReply{MessageId: messageId}
  81. }
  82. // Reply message
  83. if message.IsNeedReply() {
  84. c.Send <- newReplyMessage(message)
  85. // Reset read deadline, prevent Reader from shutting down
  86. c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
  87. }
  88. }
  89. }
  90. func (c *Client) writer() {
  91. ticker := time.NewTicker(c.pingWait)
  92. defer func() {
  93. ticker.Stop()
  94. c.UnderlyingConn.Close()
  95. }()
  96. for {
  97. select {
  98. case message, ok := <-c.Send:
  99. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  100. if !ok {
  101. // The hub closed the channel.
  102. c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
  103. return
  104. }
  105. var err error
  106. err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
  107. if err != nil {
  108. return
  109. }
  110. case <-ticker.C:
  111. // 到时间发送 ping 信号
  112. c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
  113. if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
  114. return
  115. }
  116. }
  117. }
  118. }
  119. func (c *Client) writeError(requestId string, err error) {
  120. c.Send <- &Message{
  121. RequestId: requestId,
  122. Type: MessageTypeError,
  123. Content: ContentError{Err: err.Error()},
  124. }
  125. }