package server import ( "context" "encoding/json" "sync" "github.com/nats-io/nats.go" "go.uber.org/zap" "sikey.com/websocket/pkg/natx" "sikey.com/websocket/repositories" ) const ( subject = "clients.message" headerUserId = "user_id" ) type RespondStructural struct { RequestId string `json:"request_id"` Ok bool `json:"ok"` ErrMsg string `json:"errMsg"` } func (rs *RespondStructural) String() string { return rs.Marshaler() } func (rs *RespondStructural) Marshaler() string { buf, _ := json.Marshal(rs) return string(buf) } type Nats struct { nc *nats.Conn repos *repositories.Repositories firebaseMessageServer *FirebaseMessageServer mutex sync.RWMutex Subscribers map[string]*subscriber ch chan *nats.Msg Subscribe chan *subscriber Unsubscribe chan *subscriber } type subscriber struct { client *Client } type natsMessage struct { userId string message *Message } func NewNats(addr string, repos *repositories.Repositories) *Nats { nc := natx.GetConnect() firebaseMessageServer := NewFirebaseMessageServer(repos) n := &Nats{ nc: nc, repos: repos, firebaseMessageServer: firebaseMessageServer, mutex: sync.RWMutex{}, ch: make(chan *nats.Msg, 256), Subscribers: make(map[string]*subscriber), Subscribe: make(chan *subscriber), Unsubscribe: make(chan *subscriber), } _, err := nc.ChanSubscribe(subject, n.ch) if err != nil { zap.L().Error("[nats] unable to start", zap.Error(err)) } go n.run() return n } func (n *Nats) run() { for { select { case natsMsg, ok := <-n.ch: if !ok { continue } // 序列化消息内容 message := deserializeMessage(natsMsg.Data) zap.L().Info("[nats] deserialized", zap.Any("message", message)) // 消息收到了, 先回复发送者 resp := RespondStructural{RequestId: message.RequestId(), Ok: true} _ = natsMsg.Respond([]byte(resp.Marshaler())) // 从消息里获取得到接收人信息 var receiver string switch msg := message.(type) { case *Chating: receiver = msg.Content.Receiver case *Notification: receiver = msg.Content.Receiver case *VideoCall: receiver = msg.Content.Receiver case *Location: receiver = msg.Content.Receiver default: // 未找到接收人信息 continue } ctx := context.Background() // 查询在线信息 onlineInfo, err := n.repos.OnlineRepository.GetOnline(ctx, receiver) if err != nil { // ERROR zap.L().Error("[nats] unable to get online info", zap.Error(err), zap.String("user_id", receiver)) continue } // 如果没有在线信息,说明用户不在线,这时我们需要推送 firebase messaging 通知 if onlineInfo == nil { // 如果有 firebase messaging token,我们需要推送消息 ctx := context.Background() if err := n.firebaseMessageServer.Send(ctx, receiver, message); err != nil { zap.L().Error("[firebase] unable to send message", zap.Error(err), zap.String("user_id", receiver)) } continue } n.mutex.RLock() if receiver != "" { for uid, sub := range n.Subscribers { if uid == receiver { // 写入消息到不同的客户端 sub.client.Received <- message } } } n.mutex.RUnlock() case s := <-n.Subscribe: n.mutex.Lock() n.Subscribers[s.client.UserId] = s n.mutex.Unlock() case s := <-n.Unsubscribe: n.mutex.Lock() if _, ok := n.Subscribers[s.client.UserId]; ok { delete(n.Subscribers, s.client.UserId) } n.mutex.Unlock() } } } func responseErrMsg(natsMsg *nats.Msg, err string) { resp := RespondStructural{Ok: false, ErrMsg: err} _ = natsMsg.Respond([]byte(resp.Marshaler())) }