Bladeren bron

redis 拓展 websocket

luoyangwei 1 jaar geleden
bovenliggende
commit
c6372b22e8
8 gewijzigde bestanden met toevoegingen van 221 en 99 verwijderingen
  1. 6 0
      config/config.go
  2. 3 25
      server/client.go
  3. 67 57
      server/hub.go
  4. 29 0
      server/hub_test.go
  5. 10 0
      server/redis_exchange.go
  6. 8 9
      server/server.go
  7. 96 8
      stackexchange/redis/redis_stackexchange.go
  8. 2 0
      websocket.go

+ 6 - 0
config/config.go

@@ -12,6 +12,7 @@ import (
 var (
 	Config configx.Config
 	Kafka  kafka
+	Redis  redis
 )
 
 func loadRestfulConfig() error {
@@ -22,11 +23,16 @@ func loadKafkaConfig() error {
 	return viper.UnmarshalKey("kafka", &Kafka)
 }
 
+func loadRedisConfig() error {
+	return viper.UnmarshalKey("redis", &Redis)
+}
+
 // MustLoadConfig 加载配置
 func MustLoadConfig(file string) {
 	err := configx.LoadConfig(file,
 		loadRestfulConfig,
 		loadKafkaConfig,
+		loadRedisConfig,
 	)
 	if err != nil {
 		log.Fatalln(eris.Wrap(err, "无法映射配置"))

+ 3 - 25
server/client.go

@@ -24,10 +24,7 @@ type Client struct {
 
 	// Send message channel 发送消息
 	// 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
-	// 则会通过 SendOffline channel 发送离线消息
-	Send            chan *Message
-	RemotelyMessage chan *Message // RemotelyMessage 远程消息
-	// SendOffline chan *Message // SendOffline 发送离线消息
+	Send chan *Message
 
 	readWait  time.Duration // readWait 读超时
 	writeWait time.Duration // writeWait 写超时
@@ -49,9 +46,6 @@ func (c *Client) reader() {
 		if err != nil {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
 				zlog.Errorf("error: %v", err)
-				// if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
-				// 	zlog.Error(err)
-				// }
 			}
 			break
 		}
@@ -66,7 +60,7 @@ func (c *Client) reader() {
 			// Save message to database
 			payload, _ := json.Marshal(chatingContent.Payload)
 			messageId := uuid.NewString()
-			err := c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
+			err = c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
 				ID:          messageId,
 				SessionId:   chatingContent.Receiver,
 				Receiver:    chatingContent.Receiver,
@@ -106,7 +100,7 @@ func (c *Client) reader() {
 		if message.IsNeedReply() {
 			c.Send <- newReplyMessage(message)
 
-			// Reset read dead line, prevent Reader from shutting down
+			// Reset read deadline, prevent Reader from shutting down
 			c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
 		}
 	}
@@ -117,9 +111,6 @@ func (c *Client) writer() {
 	defer func() {
 		ticker.Stop()
 		c.UnderlyingConn.Close()
-		// if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
-		// 	zlog.Error(err)
-		// }
 	}()
 
 	for {
@@ -137,19 +128,6 @@ func (c *Client) writer() {
 			if err != nil {
 				return
 			}
-
-		// case message, ok := <-c.RemotelyMessage:
-		// 	// Sending remotely message to client
-		// 	if !ok {
-		// 	}
-
-		// err := c.hub.exchange.SendMessage(c.ctx, kafka.Message{
-		// 	Key:   []byte(stackexchange.StackExchangeMessaging),
-		// 	Value: serializationMessage(message),
-		// })
-		// if err != nil {
-		// 	return
-		// }
 		case <-ticker.C:
 			// 到时间发送 ping 信号
 			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))

+ 67 - 57
server/hub.go

@@ -1,9 +1,12 @@
 package server
 
 import (
+	"context"
+	"encoding/json"
+	"fmt"
 	"sync"
 
-	"sikey.com/websocket/stackexchange/redis"
+	"github.com/redis/go-redis/v9"
 	"sikey.com/websocket/utils/zlog"
 )
 
@@ -11,40 +14,42 @@ type HubConfig struct {
 	ConnectSize    int
 	DisconnectSize int
 	MessageSize    int
-
-	// StackExchange *stackexchange.RedisStackExchange
+	ServerId       string // serverId 服务器ID
 }
 
 type Hub struct {
-	remotelyClients map[string]*Client
-	remotelyMutex   sync.RWMutex
-	clients         map[string]*Client
-	mutex           sync.RWMutex
+	serverId string // serverId 服务器ID
+	rdb      redis.UniversalClient
 
+	clients    map[string]*Client
+	mutex      sync.RWMutex
 	Connect    chan *Client
 	Disconnect chan *Client
-
-	Message chan *Message
-
-	StackExchange *redis.StackExchange
+	Message    chan *Message
 }
 
 func NewHub(cfg HubConfig) *Hub {
 	hub := &Hub{
-		remotelyClients: make(map[string]*Client),
-		remotelyMutex:   sync.RWMutex{},
-		clients:         make(map[string]*Client),
-		mutex:           sync.RWMutex{},
+		serverId: cfg.ServerId,
+		rdb: redis.NewUniversalClient(&redis.UniversalOptions{
+			Addrs:    []string{"106.75.230.4:6379"},
+			Password: "sikey!Q@W#E456",
+			DB:       0,
+		}),
+
+		clients: make(map[string]*Client),
+		mutex:   sync.RWMutex{},
 
 		Connect:    make(chan *Client, cfg.ConnectSize),
 		Disconnect: make(chan *Client, cfg.DisconnectSize),
 		Message:    make(chan *Message, cfg.MessageSize),
 	}
 
-	hub.StackExchange = redis.NewStackExchange()
-
 	go hub.run()
-	go hub.remotely() // 远程消息
+
+	go hub.remotelyEvent() // 远程事件
+	//go hub.remotelyMessageReader()
+	//go hub.remotelyMessageWriter()
 	return hub
 }
 
@@ -52,56 +57,61 @@ func (h *Hub) run() {
 	for {
 		select {
 		case client := <-h.Connect:
-			if client.isRemotely {
-				h.remotelyClients[client.UserId] = client
-			} else {
-				h.clients[client.UserId] = client
-			}
+			h.clients[client.UserId] = client
 		case client := <-h.Disconnect:
 			close(client.Send)
-			if client.isRemotely {
-				delete(h.remotelyClients, client.UserId)
-			} else {
-				delete(h.clients, client.UserId)
-			}
+			delete(h.clients, client.UserId)
 		case message := <-h.Message:
 			if client, ok := h.clients[message.receiver]; ok {
-				client.Send <- message
-			} else {
-				// 不在同一台服务器, 这里将消息发送至拓展应用
-				if client != nil {
-					client.RemotelyMessage <- message
+				if client.isRemotely {
+					ctx := context.Background()
+					err := h.rdb.Publish(ctx, messageChannelEvent, message).Err()
+					if err != nil {
+						zlog.Error(err)
+					}
+				} else {
+					client.Send <- message
 				}
 			}
 		}
 	}
 }
 
-func (h *Hub) remotely() {
-	for {
+var (
+	connectChannelEvent    = "client.event.connect"
+	disconnectChannelEvent = "client.event.disconnect"
+	messageChannelEvent    = "client.event.message"
+)
 
-		zlog.Info("Remotely client actions monitoring.")
-		// km, err := h.exchange.ReadMessage()
-		// if err != nil {
-		// 	zlog.Error(err)
-		// }
+func (h *Hub) remotelyEvent() {
+	ctx := context.TODO()
+	pubsub := h.rdb.PSubscribe(ctx, "client.event.*")
+	defer pubsub.Close()
+
+	for {
+		rMsg, err := pubsub.ReceiveMessage(ctx)
+		if err != nil {
+			zlog.Error(err)
+		}
 
-		// // Create new a remote client
-		// remotelyClient := &Client{
-		// 	isRemotely: true, // mark remotely client
-		// 	UserId:     string(km.Value),
-		// 	Send:       make(chan *Message, 256),
-		// 	hub:        h,
-		// }
-		// switch {
-		// case bytes.Equal(km.Key, stackexchange.StackExchangeOnline):
-		// 	h.Connect <- remotelyClient
-		// 	zlog.Info("Remotely client online: ", remotelyClient.UserId)
-		// case bytes.Equal(km.Key, stackexchange.StackExchangeOffline):
-		// 	h.Disconnect <- remotelyClient
-		// 	zlog.Info("Remotely client offline: ", remotelyClient.UserId)
-		// case bytes.Equal(km.Key, stackexchange.StackExchangeMessaging):
-		// 	h.Message <- deserializeMessage(km.Value)
-		// }
+		switch rMsg.Channel {
+		case connectChannelEvent:
+			h.Connect <- &Client{
+				hub:        h,
+				UserId:     rMsg.Payload,
+				isRemotely: true,
+				Send:       make(chan *Message),
+			}
+		case disconnectChannelEvent:
+			h.Disconnect <- h.clients[rMsg.Payload]
+		case messageChannelEvent:
+			var message Message
+			if err = json.Unmarshal([]byte(rMsg.Payload), &message); err != nil {
+				zlog.Error(err)
+				break
+			}
+			h.Message <- &message
+		}
+		fmt.Println(rMsg)
 	}
 }

+ 29 - 0
server/hub_test.go

@@ -1,7 +1,36 @@
 package server
 
+import (
+	"context"
+	"log"
+	"testing"
+
+	"github.com/redis/go-redis/v9"
+	"sikey.com/websocket/config"
+)
+
 var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
 
+func TestHub_ConnectMessage(t *testing.T) {
+	config.MustLoadConfig("../etc/websocket.toml")
+	rdb := redis.NewUniversalClient(&redis.UniversalOptions{
+		Addrs:    []string{config.Redis.Addr},
+		Password: config.Redis.Password,
+		DB:       config.Redis.Db,
+	})
+
+	var err error
+	ctx := context.Background()
+	err = rdb.Publish(ctx, "client.event.connect", userId).Err()
+	if err != nil {
+		log.Fatalln(err)
+	}
+	err = rdb.Publish(ctx, "client.event.disconnect", userId).Err()
+	if err != nil {
+		log.Fatalln(err)
+	}
+}
+
 // func TestHub_ConnectMessage(t *testing.T) {
 // 	writer := &kafka.Writer{
 // 		Addr:                   kafka.TCP("106.75.230.4:9092"),

+ 10 - 0
server/redis_exchange.go

@@ -0,0 +1,10 @@
+package server
+
+type exchange struct {
+	eventChannel   string
+	messageChannel string
+}
+
+func NewExchange() *exchange {
+	return &exchange{}
+}

+ 8 - 9
server/server.go

@@ -48,15 +48,14 @@ func WebsocketHandler(ctx *gin.Context, srv *Server) {
 
 	// Create client
 	client := &Client{
-		ctx:             ctx.Copy(),
-		UserId:          id,
-		hub:             srv.Hub,
-		UnderlyingConn:  conn,
-		Send:            make(chan *Message, 256),
-		RemotelyMessage: make(chan *Message, 256),
-		writeWait:       srv.WriteWait,
-		readWait:        srv.ReadWait,
-		pingWait:        srv.PingWait,
+		ctx:            ctx.Copy(),
+		UserId:         id,
+		hub:            srv.Hub,
+		UnderlyingConn: conn,
+		Send:           make(chan *Message, 256),
+		writeWait:      srv.WriteWait,
+		readWait:       srv.ReadWait,
+		pingWait:       srv.PingWait,
 
 		isSimpleMsg:  headers[keys.SimpleHeader].(bool),
 		localization: headers[keys.LocalizationHeader].(string),

+ 96 - 8
stackexchange/redis/redis_stackexchange.go

@@ -1,10 +1,98 @@
 package redis
 
-type StackExchange struct {
-	channel string
-}
-
-func NewStackExchange() *StackExchange {
-	exc := &StackExchange{}
-	return exc
-}
+// import (
+// 	"context"
+// 	"fmt"
+
+// 	rdx "github.com/redis/go-redis/v9"
+// 	"sikey.com/websocket/utils/zlog"
+// )
+
+// type StackExchange struct {
+// 	channel string
+
+// 	rdb rdx.UniversalClient
+
+// 	Connect    chan *RelationshipMessage
+// 	Disconnect chan *RelationshipMessage
+// }
+
+// type Config struct {
+// 	ServerId string
+
+// 	Cluster  []string
+// 	Password string
+// 	DB       int
+// }
+
+// func NewStackExchange(cfg Config, channel string) *StackExchange {
+// 	exc := &StackExchange{
+// 		channel: channel,
+// 		rdb: rdx.NewUniversalClient(&rdx.UniversalOptions{
+// 			Addrs:    cfg.Cluster,
+// 			Password: cfg.Password,
+// 			DB:       cfg.DB,
+// 		}),
+// 	}
+// 	go exc.run()
+// 	go exc.receive()
+// 	return exc
+// }
+
+// var (
+// 	// ws:user:clients:${uid}
+// 	// 存储用户和 WebSocket 连接的关系,采用有序集合方式存储
+// 	connectRelationship = "ws:user:clients:%s"
+// )
+
+// type (
+// 	RelationshipMessage struct {
+// 		ServerId string `json:"serverId"`
+// 		UserId   string `json:"userId"`
+// 	}
+// )
+
+// // func (exc *StackExchange) Publish(msgs ...Message) {
+// // }
+
+// func (exc *StackExchange) OnConnect(msg RelationshipMessage) error {
+// 	ctx := context.Background()
+// 	key := fmt.Sprintf(connectRelationship, msg.UserId)
+// 	return exc.rdb.Set(ctx, key, msg.ServerId, 0).Err()
+// }
+
+// func (exc *StackExchange) OnDisconnect(msg RelationshipMessage) error {
+// 	ctx := context.Background()
+// 	key := fmt.Sprintf(connectRelationship, msg.UserId)
+// 	return exc.rdb.Del(ctx, key).Err()
+// }
+
+// func (exc *StackExchange) run() {
+// 	for {
+// 		select {
+// 		case msg := <-exc.Connect:
+// 			exc.rdb.Publish(context.Background(), exc.channel, msg)
+// 		case msg := <-exc.Disconnect:
+// 			exc.rdb.Publish(context.Background(), exc.channel, msg)
+// 		}
+// 	}
+// }
+
+// func (exc *StackExchange) receive() {
+// 	ctx, cancel := context.WithCancel(context.Background())
+// 	pubsub := exc.rdb.PSubscribe(ctx, exc.channel)
+// 	defer func() {
+// 		cancel()
+// 		pubsub.Close()
+// 	}()
+
+// 	for {
+// 		msg, err := pubsub.ReceiveMessage(ctx)
+// 		if err != nil {
+// 			zlog.Error(err)
+// 			cancel()
+// 		}
+
+// 		fmt.Println(msg)
+// 	}
+// }

+ 2 - 0
websocket.go

@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"github.com/gin-gonic/gin"
+	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/repositories"
@@ -51,6 +52,7 @@ func newApp() *gin.Engine {
 		ReadWait:  10 * time.Second,
 		PingWait:  120 * time.Second,
 		Hub: server.NewHub(server.HubConfig{
+			ServerId:       uuid.NewString(),
 			ConnectSize:    1024,
 			DisconnectSize: 1024,
 			MessageSize:    125,