Browse Source

websocket 使用 nats 拓展

luoyangwei 1 year ago
parent
commit
f410e469a3
9 changed files with 137 additions and 3 deletions
  1. 2 0
      config/websocket.go
  2. 1 0
      etc/websocket.test.toml
  3. 1 0
      etc/websocket.toml
  4. 4 0
      go.mod
  5. 8 0
      go.sum
  6. 0 1
      server/client.go
  7. 12 0
      server/hub.go
  8. 87 2
      server/nats.go
  9. 22 0
      server/nats_test.go

+ 2 - 0
config/websocket.go

@@ -11,4 +11,6 @@ type websocket struct {
 
 	ReadBufferSize  int
 	WriteBufferSize int
+
+	NatsUrl string
 }

+ 1 - 0
etc/websocket.test.toml

@@ -10,6 +10,7 @@ readBufferSize = 1024 # 读消息体最大的 buffersize
 readWait = 10 # 写超时
 writeBufferSize = 1024 # 写消息体最大的 buffersize
 writeWait = 10 # 读超时
+natsUrl = "nats://106.75.230.4:4333" 
 
 [mysql]
 dsn = "root:9RKdJsEQKnjrni9R@tcp(10.23.148.10:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"

+ 1 - 0
etc/websocket.toml

@@ -10,6 +10,7 @@ readBufferSize = 1024 # 读消息体最大的 buffersize
 readWait = 10 # 写超时
 writeBufferSize = 1024 # 写消息体最大的 buffersize
 writeWait = 10 # 读超时
+natsUrl = "nats://106.75.230.4:4333" 
 
 [mysql]
 dsn = "root:qq123123@tcp(127.0.0.1:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"

+ 4 - 0
go.mod

@@ -10,6 +10,7 @@ require (
 	github.com/google/uuid v1.4.0
 	github.com/gorilla/websocket v1.5.1
 	github.com/mitchellh/mapstructure v1.5.0
+	github.com/nats-io/nats.go v1.32.0
 	github.com/redis/go-redis/v9 v9.4.0
 	github.com/rotisserie/eris v0.5.4
 	github.com/spf13/viper v1.18.2
@@ -36,12 +37,15 @@ require (
 	github.com/jinzhu/inflection v1.0.0 // indirect
 	github.com/jinzhu/now v1.1.5 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/klauspost/compress v1.17.2 // indirect
 	github.com/klauspost/cpuid/v2 v2.2.6 // indirect
 	github.com/leodido/go-urn v1.2.4 // indirect
 	github.com/magiconair/properties v1.8.7 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/nats-io/nkeys v0.4.7 // indirect
+	github.com/nats-io/nuid v1.0.1 // indirect
 	github.com/pelletier/go-toml/v2 v2.1.1 // indirect
 	github.com/sagikazarmark/locafero v0.4.0 // indirect
 	github.com/sagikazarmark/slog-shim v0.1.0 // indirect

+ 8 - 0
go.sum

@@ -64,6 +64,8 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
+github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
 github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@@ -85,6 +87,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
+github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
+github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
+github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
+github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
 github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

+ 0 - 1
server/client.go

@@ -43,7 +43,6 @@ func (c *Client) reader() {
 
 	c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
 	for {
-
 		_, bytes, err := c.UnderlyingConn.ReadMessage()
 		if err != nil {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {

+ 12 - 0
server/hub.go

@@ -15,6 +15,8 @@ type Hub struct {
 	Connect    chan *Client
 	Disconnect chan *Client
 	Message    chan *Message
+
+	Nats *Nats
 }
 
 func NewHub(serverId string) *Hub {
@@ -27,6 +29,8 @@ func NewHub(serverId string) *Hub {
 		Connect:    make(chan *Client, config.Websocket.ConnectSize),
 		Disconnect: make(chan *Client, config.Websocket.ConnectSize),
 		Message:    make(chan *Message, config.Websocket.MessageSize),
+
+		Nats: NewNats(config.Websocket.NatsUrl),
 	}
 
 	go hub.run()
@@ -40,16 +44,24 @@ func (h *Hub) run() {
 			h.mutex.Lock()
 			h.clients[client.UserId] = client
 			h.mutex.Unlock()
+			if h.Nats != nil {
+				h.Nats.Subscribe <- &subscriber{client: client}
+			}
 		case client := <-h.Disconnect:
 			h.mutex.Lock()
 			close(client.Send)
 			delete(h.clients, client.UserId)
 			h.mutex.Unlock()
+			if h.Nats != nil {
+				h.Nats.Unsubscribe <- &subscriber{client: client}
+			}
 		case message := <-h.Message:
 			h.mutex.RLock()
 			if client, ok := h.clients[message.Receiver]; ok {
 				zlog.Info("message: ", message)
 				client.Send <- message
+			} else {
+				h.Nats.Send <- map[string]*Message{message.Receiver: message}
 			}
 			h.mutex.RUnlock()
 		}

+ 87 - 2
server/nats.go

@@ -1,8 +1,93 @@
 package server
 
+import (
+	"log"
+	"sync"
+
+	"github.com/nats-io/nats.go"
+	"github.com/rotisserie/eris"
+	"sikey.com/websocket/utils/zlog"
+)
+
+const (
+	subject      = "clients.message"
+	headerUserId = "userId"
+)
+
 type Nats struct {
+	nc *nats.Conn
+
+	mutex       sync.RWMutex
+	Subscribers map[string]*subscriber
+
+	ch          chan *nats.Msg
+	Send        chan map[string]*Message
+	Subscribe   chan *subscriber
+	Unsubscribe chan *subscriber
+}
+
+type subscriber struct {
+	client *Client
+}
+
+func NewNats(addr string) *Nats {
+	nc, err := nats.Connect(addr)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	// nc.Drain()
+
+	n := &Nats{
+		nc:          nc,
+		mutex:       sync.RWMutex{},
+		ch:          make(chan *nats.Msg, 256),
+		Send:        make(chan map[string]*Message),
+		Subscribers: make(map[string]*subscriber),
+		Subscribe:   make(chan *subscriber),
+		Unsubscribe: make(chan *subscriber),
+	}
+
+	_, err = nc.ChanSubscribe(subject, n.ch)
+	if err != nil {
+		zlog.Error(eris.Wrap(err, "unable to start: "))
+	}
+
+	go n.run()
+	return n
 }
 
-func NewNats() *Nats {
-	return &Nats{}
+func (n *Nats) run() {
+	for {
+		select {
+		case message := <-n.Send:
+			for k, m := range message {
+				bytes := serializationMessage(m)
+				if err := n.nc.PublishMsg(&nats.Msg{
+					Subject: subject,
+					Data:    bytes,
+					Header:  nats.Header{headerUserId: []string{k}},
+				}); err != nil {
+					zlog.Error(eris.Wrapf(err, "unable to message send: %s", k))
+				}
+			}
+		case msg := <-n.ch:
+			userId := msg.Header.Get(headerUserId)
+			if userId == "" {
+				continue
+			}
+			n.mutex.RLock()
+			if s, ok := n.Subscribers[userId]; ok {
+				s.client.Send <- deserializeMessage(msg.Data)
+			}
+			n.mutex.RUnlock()
+		case s := <-n.Subscribe:
+			n.mutex.Lock()
+			n.Subscribers[s.client.UserId] = s
+			n.mutex.Unlock()
+		case s := <-n.Unsubscribe:
+			n.mutex.Lock()
+			delete(n.Subscribers, s.client.UserId)
+			n.mutex.Unlock()
+		}
+	}
 }

+ 22 - 0
server/nats_test.go

@@ -0,0 +1,22 @@
+package server
+
+import (
+	"testing"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/nats-io/nats.go"
+)
+
+func TestNats_Connect(t *testing.T) {
+	nc, _ := nats.Connect("nats://106.75.230.4:4333")
+	msg := nats.Msg{
+		Subject: "foo",
+		Header: nats.Header{
+			"userId": []string{uuid.NewString()},
+		},
+		Data: []byte("test"),
+	}
+	nc.PublishMsg(&msg)
+	time.Sleep(10 * time.Second)
+}