nats.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package server
  2. import (
  3. "log"
  4. "sync"
  5. "github.com/nats-io/nats.go"
  6. "github.com/rotisserie/eris"
  7. "x.sikey.com.cn/serverx/zlog"
  8. )
  9. const (
  10. subject = "clients.message"
  11. headerUserId = "userId"
  12. )
  13. type Nats struct {
  14. nc *nats.Conn
  15. mutex sync.RWMutex
  16. Subscribers map[string]*subscriber
  17. ch chan *nats.Msg
  18. Send chan *natsMessage
  19. Subscribe chan *subscriber
  20. Unsubscribe chan *subscriber
  21. }
  22. type subscriber struct {
  23. client *Client
  24. }
  25. type natsMessage struct {
  26. userId string
  27. message *Message
  28. }
  29. func NewNats(addr string) *Nats {
  30. nc, err := nats.Connect(addr)
  31. if err != nil {
  32. log.Fatalln(err)
  33. }
  34. // nc.Drain()
  35. n := &Nats{
  36. nc: nc,
  37. mutex: sync.RWMutex{},
  38. ch: make(chan *nats.Msg, 256),
  39. Send: make(chan *natsMessage),
  40. Subscribers: make(map[string]*subscriber),
  41. Subscribe: make(chan *subscriber),
  42. Unsubscribe: make(chan *subscriber),
  43. }
  44. _, err = nc.ChanSubscribe(subject, n.ch)
  45. if err != nil {
  46. zlog.Error(eris.Wrap(err, "unable to start: "))
  47. }
  48. go n.run()
  49. return n
  50. }
  51. func (n *Nats) run() {
  52. for {
  53. select {
  54. case nMsg := <-n.Send:
  55. bytes := serializationMessage(nMsg.message)
  56. if err := n.nc.PublishMsg(&nats.Msg{
  57. Subject: subject,
  58. Data: bytes,
  59. Header: nats.Header{headerUserId: []string{nMsg.userId}},
  60. }); err != nil {
  61. zlog.Error(eris.Wrapf(err, "unable to message send: %s", nMsg.userId))
  62. }
  63. case msg := <-n.ch:
  64. userIds := msg.Header.Values(headerUserId)
  65. zlog.Debugf("received nats ids: %v", userIds)
  66. if len(userIds) < 1 {
  67. continue
  68. }
  69. n.mutex.RLock()
  70. for _, uid := range userIds {
  71. if s, ok := n.Subscribers[uid]; ok {
  72. message := deserializeMessage(msg.Data)
  73. zlog.Debugf("[%s] message: %v", uid, string(msg.Data))
  74. s.client.Send <- message
  75. }
  76. }
  77. n.mutex.RUnlock()
  78. case s := <-n.Subscribe:
  79. n.mutex.Lock()
  80. n.Subscribers[s.client.UserId] = s
  81. n.mutex.Unlock()
  82. case s := <-n.Unsubscribe:
  83. n.mutex.Lock()
  84. delete(n.Subscribers, s.client.UserId)
  85. n.mutex.Unlock()
  86. }
  87. }
  88. }