123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package server
- import (
- "context"
- "encoding/json"
- "sync"
- "github.com/nats-io/nats.go"
- "go.uber.org/zap"
- "sikey.com/websocket/pkg/natx"
- "sikey.com/websocket/repositories"
- )
- const (
- subject = "clients.message"
- headerUserId = "user_id"
- )
- type RespondStructural struct {
- RequestId string `json:"request_id"`
- Ok bool `json:"ok"`
- ErrMsg string `json:"errMsg"`
- }
- func (rs *RespondStructural) String() string {
- return rs.Marshaler()
- }
- func (rs *RespondStructural) Marshaler() string {
- buf, _ := json.Marshal(rs)
- return string(buf)
- }
- type Nats struct {
- nc *nats.Conn
- repos *repositories.Repositories
- firebaseMessageServer *FirebaseMessageServer
- mutex sync.RWMutex
- Subscribers map[string]*subscriber
- ch chan *nats.Msg
- Subscribe chan *subscriber
- Unsubscribe chan *subscriber
- }
- type subscriber struct {
- client *Client
- }
- type natsMessage struct {
- userId string
- message *Message
- }
- func NewNats(addr string, repos *repositories.Repositories) *Nats {
- nc := natx.GetConnect()
- firebaseMessageServer := NewFirebaseMessageServer(repos)
- n := &Nats{
- nc: nc,
- repos: repos,
- firebaseMessageServer: firebaseMessageServer,
- mutex: sync.RWMutex{},
- ch: make(chan *nats.Msg, 256),
- Subscribers: make(map[string]*subscriber),
- Subscribe: make(chan *subscriber),
- Unsubscribe: make(chan *subscriber),
- }
- _, err := nc.ChanSubscribe(subject, n.ch)
- if err != nil {
- zap.L().Error("[nats] unable to start", zap.Error(err))
- }
- go n.run()
- return n
- }
- func (n *Nats) run() {
- for {
- select {
- case natsMsg, ok := <-n.ch:
- if !ok {
- continue
- }
- // 序列化消息内容
- message := deserializeMessage(natsMsg.Data)
- zap.L().Info("[nats] deserialized", zap.Any("message", message))
- // 消息收到了, 先回复发送者
- resp := RespondStructural{RequestId: message.RequestId(), Ok: true}
- _ = natsMsg.Respond([]byte(resp.Marshaler()))
- // 从消息里获取得到接收人信息
- var receiver string
- switch msg := message.(type) {
- case *Chating:
- receiver = msg.Content.Receiver
- case *Notification:
- receiver = msg.Content.Receiver
- case *VideoCall:
- receiver = msg.Content.Receiver
- case *Location:
- receiver = msg.Content.Receiver
- default:
- // 未找到接收人信息
- continue
- }
- ctx := context.Background()
- // 查询在线信息
- onlineInfo, err := n.repos.OnlineRepository.GetOnline(ctx, receiver)
- if err != nil {
- // ERROR
- zap.L().Error("[nats] unable to get online info", zap.Error(err), zap.String("user_id", receiver))
- continue
- }
- // 如果没有在线信息,说明用户不在线,这时我们需要推送 firebase messaging 通知
- if onlineInfo == nil {
- // 如果有 firebase messaging token,我们需要推送消息
- ctx := context.Background()
- if err := n.firebaseMessageServer.Send(ctx, receiver, message); err != nil {
- zap.L().Error("[firebase] unable to send message", zap.Error(err), zap.String("user_id", receiver))
- }
- continue
- }
- n.mutex.RLock()
- if receiver != "" {
- for uid, sub := range n.Subscribers {
- if uid == receiver {
- // 写入消息到不同的客户端
- sub.client.Received <- message
- }
- }
- }
- n.mutex.RUnlock()
- case s := <-n.Subscribe:
- n.mutex.Lock()
- n.Subscribers[s.client.UserId] = s
- n.mutex.Unlock()
- case s := <-n.Unsubscribe:
- n.mutex.Lock()
- if _, ok := n.Subscribers[s.client.UserId]; ok {
- delete(n.Subscribers, s.client.UserId)
- }
- n.mutex.Unlock()
- }
- }
- }
- func responseErrMsg(natsMsg *nats.Msg, err string) {
- resp := RespondStructural{Ok: false, ErrMsg: err}
- _ = natsMsg.Respond([]byte(resp.Marshaler()))
- }
|