nats.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "github.com/nats-io/nats.go"
  7. "go.uber.org/zap"
  8. "sikey.com/websocket/pkg/natx"
  9. "sikey.com/websocket/repositories"
  10. )
  11. const (
  12. subject = "clients.message"
  13. headerUserId = "user_id"
  14. )
  15. type RespondStructural struct {
  16. RequestId string `json:"request_id"`
  17. Ok bool `json:"ok"`
  18. ErrMsg string `json:"errMsg"`
  19. }
  20. func (rs *RespondStructural) String() string {
  21. return rs.Marshaler()
  22. }
  23. func (rs *RespondStructural) Marshaler() string {
  24. buf, _ := json.Marshal(rs)
  25. return string(buf)
  26. }
  27. type Nats struct {
  28. nc *nats.Conn
  29. repos *repositories.Repositories
  30. firebaseMessageServer *FirebaseMessageServer
  31. mutex sync.RWMutex
  32. Subscribers map[string]*subscriber
  33. ch chan *nats.Msg
  34. Subscribe chan *subscriber
  35. Unsubscribe chan *subscriber
  36. }
  37. type subscriber struct {
  38. client *Client
  39. }
  40. type natsMessage struct {
  41. userId string
  42. message *Message
  43. }
  44. func NewNats(addr string, repos *repositories.Repositories) *Nats {
  45. nc := natx.GetConnect()
  46. firebaseMessageServer := NewFirebaseMessageServer(repos)
  47. n := &Nats{
  48. nc: nc,
  49. repos: repos,
  50. firebaseMessageServer: firebaseMessageServer,
  51. mutex: sync.RWMutex{},
  52. ch: make(chan *nats.Msg, 256),
  53. Subscribers: make(map[string]*subscriber),
  54. Subscribe: make(chan *subscriber),
  55. Unsubscribe: make(chan *subscriber),
  56. }
  57. _, err := nc.ChanSubscribe(subject, n.ch)
  58. if err != nil {
  59. zap.L().Error("[nats] unable to start", zap.Error(err))
  60. }
  61. go n.run()
  62. return n
  63. }
  64. func (n *Nats) run() {
  65. for {
  66. select {
  67. case natsMsg, ok := <-n.ch:
  68. if !ok {
  69. continue
  70. }
  71. // 序列化消息内容
  72. message := deserializeMessage(natsMsg.Data)
  73. zap.L().Info("[nats] deserialized", zap.Any("message", message))
  74. // 消息收到了, 先回复发送者
  75. resp := RespondStructural{RequestId: message.RequestId(), Ok: true}
  76. _ = natsMsg.Respond([]byte(resp.Marshaler()))
  77. // 从消息里获取得到接收人信息
  78. var receiver string
  79. switch msg := message.(type) {
  80. case *Chating:
  81. receiver = msg.Content.Receiver
  82. case *Notification:
  83. receiver = msg.Content.Receiver
  84. case *VideoCall:
  85. receiver = msg.Content.Receiver
  86. case *Location:
  87. receiver = msg.Content.Receiver
  88. default:
  89. // 未找到接收人信息
  90. continue
  91. }
  92. ctx := context.Background()
  93. // 查询在线信息
  94. onlineInfo, err := n.repos.OnlineRepository.GetOnline(ctx, receiver)
  95. if err != nil {
  96. // ERROR
  97. zap.L().Error("[nats] unable to get online info", zap.Error(err), zap.String("user_id", receiver))
  98. continue
  99. }
  100. // 如果没有在线信息,说明用户不在线,这时我们需要推送 firebase messaging 通知
  101. if onlineInfo == nil {
  102. // 如果有 firebase messaging token,我们需要推送消息
  103. ctx := context.Background()
  104. if err := n.firebaseMessageServer.Send(ctx, receiver, message); err != nil {
  105. zap.L().Error("[firebase] unable to send message", zap.Error(err), zap.String("user_id", receiver))
  106. }
  107. continue
  108. }
  109. n.mutex.RLock()
  110. if receiver != "" {
  111. for uid, sub := range n.Subscribers {
  112. if uid == receiver {
  113. // 写入消息到不同的客户端
  114. sub.client.Received <- message
  115. }
  116. }
  117. }
  118. n.mutex.RUnlock()
  119. case s := <-n.Subscribe:
  120. n.mutex.Lock()
  121. n.Subscribers[s.client.UserId] = s
  122. n.mutex.Unlock()
  123. case s := <-n.Unsubscribe:
  124. n.mutex.Lock()
  125. if _, ok := n.Subscribers[s.client.UserId]; ok {
  126. delete(n.Subscribers, s.client.UserId)
  127. }
  128. n.mutex.Unlock()
  129. }
  130. }
  131. }
  132. func responseErrMsg(natsMsg *nats.Msg, err string) {
  133. resp := RespondStructural{Ok: false, ErrMsg: err}
  134. _ = natsMsg.Respond([]byte(resp.Marshaler()))
  135. }