nats.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package server
  2. import (
  3. "encoding/json"
  4. "sync"
  5. "time"
  6. "github.com/nats-io/nats.go"
  7. "go.uber.org/zap"
  8. "sikey.com/websocket/repositories"
  9. "x.sikey.com.cn/serverx/dbx"
  10. "x.sikey.com.cn/serverx/natx"
  11. "x.sikey.com.cn/serverx/rdbx"
  12. )
  13. const (
  14. subject = "clients.message"
  15. headerUserId = "user_id"
  16. )
  17. type RespondStructural struct {
  18. RequestId string `json:"request_id"`
  19. Ok bool `json:"ok"`
  20. ErrMsg string `json:"errMsg"`
  21. }
  22. func (rs *RespondStructural) String() string {
  23. return rs.Marshaler()
  24. }
  25. func (rs *RespondStructural) Marshaler() string {
  26. buf, _ := json.Marshal(rs)
  27. return string(buf)
  28. }
  29. type Nats struct {
  30. nc *nats.Conn
  31. mutex sync.RWMutex
  32. Subscribers map[string]*subscriber
  33. ch chan *nats.Msg
  34. Send chan *natsMessage
  35. Subscribe chan *subscriber
  36. Unsubscribe chan *subscriber
  37. Repositories *repositories.Repositories
  38. }
  39. type subscriber struct {
  40. client *Client
  41. }
  42. type natsMessage struct {
  43. userId string
  44. message *Message
  45. }
  46. func NewNats(addr string) *Nats {
  47. nc := natx.GetConnect()
  48. n := &Nats{
  49. nc: nc,
  50. mutex: sync.RWMutex{},
  51. ch: make(chan *nats.Msg, 256),
  52. Send: make(chan *natsMessage),
  53. Subscribers: make(map[string]*subscriber),
  54. Subscribe: make(chan *subscriber),
  55. Unsubscribe: make(chan *subscriber),
  56. // db
  57. Repositories: repositories.NewRepositories(dbx.GetConnect(), rdbx.GetConnect()),
  58. }
  59. _, err := nc.ChanSubscribe(subject, n.ch)
  60. if err != nil {
  61. zap.L().Error("[nats] unable to start", zap.Error(err))
  62. }
  63. go n.run()
  64. return n
  65. }
  66. func (n *Nats) run() {
  67. for {
  68. select {
  69. case nMsg := <-n.Send:
  70. bytes := serializationMessage(nMsg.message)
  71. timeout := 5 * time.Second
  72. if _, err := n.nc.RequestMsg(&nats.Msg{
  73. Subject: subject,
  74. Data: bytes,
  75. Header: nats.Header{
  76. headerUserId: []string{nMsg.userId},
  77. "sender": []string{nMsg.message.sender},
  78. "request_id": []string{nMsg.message.RequestId},
  79. },
  80. }, timeout); err != nil {
  81. zap.L().Error("[nats] unable to message send",
  82. zap.Error(err),
  83. zap.String("user_id", nMsg.userId))
  84. }
  85. case msg := <-n.ch:
  86. requestId := msg.Header.Get("request_id")
  87. sender := msg.Header.Get("sender")
  88. ids := msg.Header.Values("user_id")
  89. if len(ids) == 0 {
  90. zap.L().Info("[nats] received empty userIds",
  91. zap.String("request_id", requestId))
  92. continue
  93. }
  94. message := deserializeMessage(msg.Data)
  95. message.sender = sender
  96. // Error handler
  97. if message.Type == MessageTypeError {
  98. zap.L().Error("[nats] received error message",
  99. zap.String("request_id", requestId),
  100. zap.ByteString("message", serializationMessage(message)))
  101. resp := RespondStructural{
  102. RequestId: message.RequestId,
  103. Ok: false,
  104. ErrMsg: message.Content.(string),
  105. }
  106. _ = msg.Respond([]byte(resp.Marshaler()))
  107. continue
  108. }
  109. zap.L().Info("[nats] received nats message",
  110. zap.Strings("ids", ids), zap.String("request_id", requestId))
  111. n.mutex.RLock()
  112. for _, uid := range ids {
  113. if s, ok := n.Subscribers[uid]; ok {
  114. clt := s.client
  115. ctx := clt.withRequestIdContext(clt.ctx.Copy(), message.RequestId)
  116. // Sender
  117. message.sender = sender
  118. if message.Receiver == "" {
  119. message.Receiver = uid
  120. }
  121. // Save message
  122. if err := clt.persistenceMessage(ctx, message); err != nil {
  123. zap.L().Error("[nats] unable to message",
  124. zap.Error(err),
  125. zap.String("request_id", message.RequestId))
  126. resp := RespondStructural{
  127. RequestId: message.RequestId,
  128. Ok: false,
  129. ErrMsg: err.Error(),
  130. }
  131. _ = msg.Respond([]byte(resp.Marshaler()))
  132. continue
  133. }
  134. zap.L().Info("[nats] relay received message",
  135. zap.String("user_id", uid),
  136. zap.String("message_id", message.MessageId),
  137. zap.ByteString("message", serializationMessage(message)),
  138. zap.String("request_id", message.RequestId))
  139. // Message to client channel
  140. clt.Send <- message
  141. }
  142. }
  143. // response
  144. resp := RespondStructural{RequestId: message.MessageId, Ok: true}
  145. _ = msg.Respond([]byte(resp.Marshaler()))
  146. n.mutex.RUnlock()
  147. case s := <-n.Subscribe:
  148. n.mutex.Lock()
  149. n.Subscribers[s.client.UserId] = s
  150. n.mutex.Unlock()
  151. case s := <-n.Unsubscribe:
  152. n.mutex.Lock()
  153. delete(n.Subscribers, s.client.UserId)
  154. n.mutex.Unlock()
  155. }
  156. }
  157. }