nats.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package server
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. "github.com/nats-io/nats.go"
  7. "go.uber.org/zap"
  8. "sikey.com/websocket/pkg/dbx"
  9. "sikey.com/websocket/pkg/natx"
  10. "sikey.com/websocket/pkg/rdbx"
  11. "sikey.com/websocket/repositories"
  12. )
  13. const (
  14. subject = "clients.message"
  15. headerUserId = "user_id"
  16. )
  17. type RespondStructural struct {
  18. RequestId string `json:"request_id"`
  19. Ok bool `json:"ok"`
  20. ErrMsg string `json:"errMsg"`
  21. }
  22. func (rs *RespondStructural) String() string {
  23. return rs.Marshaler()
  24. }
  25. func (rs *RespondStructural) Marshaler() string {
  26. buf, _ := json.Marshal(rs)
  27. return string(buf)
  28. }
  29. type Nats struct {
  30. nc *nats.Conn
  31. mutex sync.RWMutex
  32. Subscribers map[string]*subscriber
  33. ch chan *nats.Msg
  34. Send chan *natsMessage
  35. Subscribe chan *subscriber
  36. Unsubscribe chan *subscriber
  37. Repositories *repositories.Repositories
  38. }
  39. type subscriber struct {
  40. client *Client
  41. }
  42. type natsMessage struct {
  43. userId string
  44. message *Message
  45. }
  46. func NewNats(addr string) *Nats {
  47. nc := natx.GetConnect()
  48. n := &Nats{
  49. nc: nc,
  50. mutex: sync.RWMutex{},
  51. ch: make(chan *nats.Msg, 256),
  52. Send: make(chan *natsMessage),
  53. Subscribers: make(map[string]*subscriber),
  54. Subscribe: make(chan *subscriber),
  55. Unsubscribe: make(chan *subscriber),
  56. // db
  57. Repositories: repositories.NewRepositories(dbx.GetConnect(), rdbx.GetConnect()),
  58. }
  59. _, err := nc.ChanSubscribe(subject, n.ch)
  60. if err != nil {
  61. zap.L().Error("[nats] unable to start", zap.Error(err))
  62. }
  63. go n.run()
  64. return n
  65. }
  66. func (n *Nats) run() {
  67. for {
  68. select {
  69. case nMsg := <-n.Send:
  70. bytes := serializationMessage(nMsg.message)
  71. timeout := 5 * time.Second
  72. if _, err := n.nc.RequestMsg(&nats.Msg{
  73. Subject: subject,
  74. Data: bytes,
  75. Header: nats.Header{
  76. headerUserId: []string{nMsg.userId},
  77. "sender": []string{nMsg.message.sender},
  78. "request_id": []string{nMsg.message.RequestId},
  79. },
  80. }, timeout); err != nil {
  81. zap.L().Error("[nats] unable to message send",
  82. zap.Error(err),
  83. zap.String("request_id", nMsg.message.RequestId),
  84. zap.Any("message", nMsg.message),
  85. zap.String("user_id", nMsg.userId))
  86. }
  87. case msg := <-n.ch:
  88. requestId := msg.Header.Get("request_id")
  89. sender := msg.Header.Get("sender")
  90. ids := msg.Header.Values("user_id")
  91. if len(ids) == 0 {
  92. zap.L().Info("[nats] received empty userIds",
  93. zap.String("request_id", requestId))
  94. continue
  95. }
  96. message := deserializeMessage(msg.Data)
  97. message.sender = sender
  98. // Error handler
  99. if message.Type == MessageTypeError {
  100. zap.L().Error("[nats] received error message",
  101. zap.String("request_id", requestId),
  102. zap.ByteString("message", serializationMessage(message)))
  103. resp := RespondStructural{
  104. RequestId: message.RequestId,
  105. Ok: false,
  106. ErrMsg: message.Content.(string),
  107. }
  108. _ = msg.Respond([]byte(resp.Marshaler()))
  109. continue
  110. }
  111. zap.L().Info("[nats] received nats message",
  112. zap.Strings("ids", ids), zap.String("request_id", requestId))
  113. n.mutex.RLock()
  114. for _, uid := range ids {
  115. if s, ok := n.Subscribers[uid]; ok {
  116. clt := s.client
  117. ctx := clt.withRequestIdContext(clt.ctx.Copy(), message.RequestId)
  118. // Sender
  119. message.sender = sender
  120. if message.Receiver == "" {
  121. message.Receiver = uid
  122. }
  123. // Save message
  124. if message.Type == MessageTypeUpChating {
  125. if err := clt.persistenceMessage(ctx, message); err != nil {
  126. zap.L().Error("[nats] unable to message",
  127. zap.Error(err),
  128. zap.String("request_id", message.RequestId))
  129. resp := RespondStructural{
  130. RequestId: message.RequestId,
  131. Ok: false,
  132. ErrMsg: err.Error(),
  133. }
  134. _ = msg.Respond([]byte(resp.Marshaler()))
  135. continue
  136. }
  137. }
  138. zap.L().Info("[nats] relay received message",
  139. zap.String("user_id", uid),
  140. zap.String("message_id", message.MessageId),
  141. zap.ByteString("message", serializationMessage(message)),
  142. zap.String("request_id", message.RequestId))
  143. // Message to client channel
  144. clt.Send <- message
  145. }
  146. }
  147. // response
  148. resp := RespondStructural{RequestId: message.MessageId, Ok: true}
  149. _ = msg.Respond([]byte(resp.Marshaler()))
  150. n.mutex.RUnlock()
  151. case s := <-n.Subscribe:
  152. n.mutex.Lock()
  153. n.Subscribers[s.client.UserId] = s
  154. n.mutex.Unlock()
  155. case s := <-n.Unsubscribe:
  156. n.mutex.Lock()
  157. delete(n.Subscribers, s.client.UserId)
  158. n.mutex.Unlock()
  159. }
  160. }
  161. }