123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package server
- import (
- "log"
- "sync"
- "github.com/nats-io/nats.go"
- "github.com/rotisserie/eris"
- "x.sikey.com.cn/serverx/zlog"
- )
- const (
- subject = "clients.message"
- headerUserId = "userId"
- )
- type Nats struct {
- nc *nats.Conn
- mutex sync.RWMutex
- Subscribers map[string]*subscriber
- ch chan *nats.Msg
- Send chan *natsMessage
- Subscribe chan *subscriber
- Unsubscribe chan *subscriber
- }
- type subscriber struct {
- client *Client
- }
- type natsMessage struct {
- userId string
- message *Message
- }
- func NewNats(addr string) *Nats {
- nc, err := nats.Connect(addr)
- if err != nil {
- log.Fatalln(err)
- }
- // nc.Drain()
- n := &Nats{
- nc: nc,
- mutex: sync.RWMutex{},
- ch: make(chan *nats.Msg, 256),
- Send: make(chan *natsMessage),
- Subscribers: make(map[string]*subscriber),
- Subscribe: make(chan *subscriber),
- Unsubscribe: make(chan *subscriber),
- }
- _, err = nc.ChanSubscribe(subject, n.ch)
- if err != nil {
- zlog.Error(eris.Wrap(err, "unable to start: "))
- }
- go n.run()
- return n
- }
- func (n *Nats) run() {
- for {
- select {
- case nMsg := <-n.Send:
- bytes := serializationMessage(nMsg.message)
- if err := n.nc.PublishMsg(&nats.Msg{
- Subject: subject,
- Data: bytes,
- Header: nats.Header{headerUserId: []string{nMsg.userId}},
- }); err != nil {
- zlog.Error(eris.Wrapf(err, "unable to message send: %s", nMsg.userId))
- }
- case msg := <-n.ch:
- userIds := msg.Header.Values(headerUserId)
- zlog.Debugf("received nats ids: %v", userIds)
- if len(userIds) < 1 {
- continue
- }
- n.mutex.RLock()
- for _, uid := range userIds {
- if s, ok := n.Subscribers[uid]; ok {
- message := deserializeMessage(msg.Data)
- zlog.Debugf("[%s] message: %v", uid, string(msg.Data))
- s.client.Send <- message
- }
- }
- n.mutex.RUnlock()
- case s := <-n.Subscribe:
- n.mutex.Lock()
- n.Subscribers[s.client.UserId] = s
- n.mutex.Unlock()
- case s := <-n.Unsubscribe:
- n.mutex.Lock()
- delete(n.Subscribers, s.client.UserId)
- n.mutex.Unlock()
- }
- }
- }
|