123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package server
- import (
- "encoding/json"
- "sync"
- "time"
- "github.com/nats-io/nats.go"
- "go.uber.org/zap"
- "sikey.com/websocket/repositories"
- "x.sikey.com.cn/serverx/dbx"
- "x.sikey.com.cn/serverx/natx"
- "x.sikey.com.cn/serverx/rdbx"
- )
- const (
- subject = "clients.message"
- headerUserId = "user_id"
- )
- type RespondStructural struct {
- RequestId string `json:"request_id"`
- Ok bool `json:"ok"`
- ErrMsg string `json:"errMsg"`
- }
- func (rs *RespondStructural) String() string {
- return rs.Marshaler()
- }
- func (rs *RespondStructural) Marshaler() string {
- buf, _ := json.Marshal(rs)
- return string(buf)
- }
- type Nats struct {
- nc *nats.Conn
- mutex sync.RWMutex
- Subscribers map[string]*subscriber
- ch chan *nats.Msg
- Send chan *natsMessage
- Subscribe chan *subscriber
- Unsubscribe chan *subscriber
- Repositories *repositories.Repositories
- }
- type subscriber struct {
- client *Client
- }
- type natsMessage struct {
- userId string
- message *Message
- }
- func NewNats(addr string) *Nats {
- nc := natx.GetConnect()
- n := &Nats{
- nc: nc,
- mutex: sync.RWMutex{},
- ch: make(chan *nats.Msg, 256),
- Send: make(chan *natsMessage),
- Subscribers: make(map[string]*subscriber),
- Subscribe: make(chan *subscriber),
- Unsubscribe: make(chan *subscriber),
- // db
- Repositories: repositories.NewRepositories(dbx.GetConnect(), rdbx.GetConnect()),
- }
- _, err := nc.ChanSubscribe(subject, n.ch)
- if err != nil {
- zap.L().Error("[nats] unable to start", zap.Error(err))
- }
- go n.run()
- return n
- }
- func (n *Nats) run() {
- for {
- select {
- case nMsg := <-n.Send:
- bytes := serializationMessage(nMsg.message)
- timeout := 5 * time.Second
- if _, err := n.nc.RequestMsg(&nats.Msg{
- Subject: subject,
- Data: bytes,
- Header: nats.Header{
- headerUserId: []string{nMsg.userId},
- "sender": []string{nMsg.message.sender},
- "request_id": []string{nMsg.message.RequestId},
- },
- }, timeout); err != nil {
- zap.L().Error("[nats] unable to message send",
- zap.Error(err),
- zap.String("user_id", nMsg.userId))
- }
- case msg := <-n.ch:
- requestId := msg.Header.Get("request_id")
- sender := msg.Header.Get("sender")
- ids := msg.Header.Values("user_id")
- if len(ids) == 0 {
- zap.L().Info("[nats] received empty userIds",
- zap.String("request_id", requestId))
- continue
- }
- message := deserializeMessage(msg.Data)
- message.sender = sender
- // Error handler
- if message.Type == MessageTypeError {
- zap.L().Error("[nats] received error message",
- zap.String("request_id", requestId),
- zap.ByteString("message", serializationMessage(message)))
- resp := RespondStructural{
- RequestId: message.RequestId,
- Ok: false,
- ErrMsg: message.Content.(string),
- }
- _ = msg.Respond([]byte(resp.Marshaler()))
- continue
- }
- zap.L().Info("[nats] received nats message",
- zap.Strings("ids", ids), zap.String("request_id", requestId))
- n.mutex.RLock()
- for _, uid := range ids {
- if s, ok := n.Subscribers[uid]; ok {
- clt := s.client
- ctx := clt.withRequestIdContext(clt.ctx.Copy(), message.RequestId)
- // Sender
- message.sender = sender
- if message.Receiver == "" {
- message.Receiver = uid
- }
- // Save message
- if err := clt.persistenceMessage(ctx, message); err != nil {
- zap.L().Error("[nats] unable to message",
- zap.Error(err),
- zap.String("request_id", message.RequestId))
- resp := RespondStructural{
- RequestId: message.RequestId,
- Ok: false,
- ErrMsg: err.Error(),
- }
- _ = msg.Respond([]byte(resp.Marshaler()))
- continue
- }
- zap.L().Info("[nats] relay received message",
- zap.String("user_id", uid),
- zap.String("message_id", message.MessageId),
- zap.ByteString("message", serializationMessage(message)),
- zap.String("request_id", message.RequestId))
- // Message to client channel
- clt.Send <- message
- }
- }
- // response
- resp := RespondStructural{RequestId: message.MessageId, Ok: true}
- _ = msg.Respond([]byte(resp.Marshaler()))
- n.mutex.RUnlock()
- case s := <-n.Subscribe:
- n.mutex.Lock()
- n.Subscribers[s.client.UserId] = s
- n.mutex.Unlock()
- case s := <-n.Unsubscribe:
- n.mutex.Lock()
- delete(n.Subscribers, s.client.UserId)
- n.mutex.Unlock()
- }
- }
- }
|