Forráskód Böngészése

多user一次发送

luoyangwei 1 éve
szülő
commit
cdd22dc688
3 módosított fájl, 109 hozzáadás és 0 törlés
  1. 3 0
      go.mod
  2. 6 0
      go.sum
  3. 100 0
      server/nats.go

+ 3 - 0
go.mod

@@ -7,6 +7,7 @@ require (
 	github.com/golang-jwt/jwt/v5 v5.2.0
 	github.com/gorilla/websocket v1.5.1
 	github.com/mitchellh/mapstructure v1.5.0
+	github.com/nats-io/nats.go v1.31.0
 	github.com/redis/go-redis/v9 v9.4.0
 	github.com/rotisserie/eris v0.5.4
 	github.com/segmentio/kafka-go v0.4.47
@@ -41,6 +42,8 @@ require (
 	github.com/mattn/go-isatty v0.0.20 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/nats-io/nkeys v0.4.6 // indirect
+	github.com/nats-io/nuid v1.0.1 // indirect
 	github.com/pelletier/go-toml/v2 v2.1.1 // indirect
 	github.com/pierrec/lz4/v4 v4.1.15 // indirect
 	github.com/sagikazarmark/locafero v0.4.0 // indirect

+ 6 - 0
go.sum

@@ -82,6 +82,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
+github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
+github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
+github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
+github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
 github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=

+ 100 - 0
server/nats.go

@@ -0,0 +1,100 @@
+package server
+
+import (
+	"log"
+	"sync"
+
+	"github.com/nats-io/nats.go"
+	"github.com/rotisserie/eris"
+	"sikey.com/websocket/utils/zlog"
+)
+
+const (
+	subject      = "clients.message"
+	headerUserId = "userId"
+)
+
+type Nats struct {
+	nc *nats.Conn
+
+	mutex       sync.RWMutex
+	Subscribers map[string]*subscriber
+
+	ch          chan *nats.Msg
+	Send        chan *natsMessage
+	Subscribe   chan *subscriber
+	Unsubscribe chan *subscriber
+}
+
+type subscriber struct {
+	client *Client
+}
+
+type natsMessage struct {
+	userId  string
+	message *Message
+}
+
+func NewNats(addr string) *Nats {
+	nc, err := nats.Connect(addr)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	// nc.Drain()
+
+	n := &Nats{
+		nc:          nc,
+		mutex:       sync.RWMutex{},
+		ch:          make(chan *nats.Msg, 256),
+		Send:        make(chan *natsMessage),
+		Subscribers: make(map[string]*subscriber),
+		Subscribe:   make(chan *subscriber),
+		Unsubscribe: make(chan *subscriber),
+	}
+
+	_, err = nc.ChanSubscribe(subject, n.ch)
+	if err != nil {
+		zlog.Error(eris.Wrap(err, "unable to start: "))
+	}
+
+	go n.run()
+	return n
+}
+
+func (n *Nats) run() {
+	for {
+		select {
+		case nMsg := <-n.Send:
+			bytes := serializationMessage(nMsg.message)
+			if err := n.nc.PublishMsg(&nats.Msg{
+				Subject: subject,
+				Data:    bytes,
+				Header:  nats.Header{headerUserId: []string{nMsg.userId}},
+			}); err != nil {
+				zlog.Error(eris.Wrapf(err, "unable to message send: %s", nMsg.userId))
+			}
+		case msg := <-n.ch:
+			userIds := msg.Header.Values(headerUserId)
+			if len(userIds) < 1 {
+				continue
+			}
+			n.mutex.RLock()
+			for _, uid := range userIds {
+				if s, ok := n.Subscribers[uid]; ok {
+					message := deserializeMessage(msg.Data)
+					zlog.Debugf("[%s] message: %v", uid, string(msg.Data))
+					s.client.Send <- message
+				}
+			}
+			n.mutex.RUnlock()
+		case s := <-n.Subscribe:
+			n.mutex.Lock()
+			n.Subscribers[s.client.UserId] = s
+			n.mutex.Unlock()
+		case s := <-n.Unsubscribe:
+			n.mutex.Lock()
+			delete(n.Subscribers, s.client.UserId)
+			n.mutex.Unlock()
+		}
+	}
+}