Ver Fonte

使用 redis 拓展

luoyangwei há 1 ano atrás
pai
commit
dca0509732
4 ficheiros alterados com 56 adições e 218 exclusões
  1. 49 27
      server/hub.go
  2. 0 98
      stackexchange/redis/redis_stackexchange.go
  3. 0 92
      stackexchange/stackexchange.go
  4. 7 1
      websocket.go

+ 49 - 27
server/hub.go

@@ -15,6 +15,8 @@ type HubConfig struct {
 	DisconnectSize int
 	MessageSize    int
 	ServerId       string // serverId 服务器ID
+
+	Rdb redis.UniversalClient
 }
 
 type Hub struct {
@@ -31,11 +33,7 @@ type Hub struct {
 func NewHub(cfg HubConfig) *Hub {
 	hub := &Hub{
 		serverId: cfg.ServerId,
-		rdb: redis.NewUniversalClient(&redis.UniversalOptions{
-			Addrs:    []string{"106.75.230.4:6379"},
-			Password: "sikey!Q@W#E456",
-			DB:       0,
-		}),
+		rdb:      cfg.Rdb,
 
 		clients: make(map[string]*Client),
 		mutex:   sync.RWMutex{},
@@ -46,29 +44,25 @@ func NewHub(cfg HubConfig) *Hub {
 	}
 
 	go hub.run()
-
 	go hub.remotelyEvent() // 远程事件
 	return hub
 }
 
 func (h *Hub) run() {
 	for {
+		ctx := context.Background()
 		select {
 		case client := <-h.Connect:
 			h.clients[client.UserId] = client
-			h.OnPublishConnect(context.Background(), client)
+			h.OnPublishConnect(ctx, client)
 		case client := <-h.Disconnect:
 			close(client.Send)
 			delete(h.clients, client.UserId)
-			h.OnPublishDisconnect(context.Background(), client)
+			h.OnPublishDisconnect(ctx, client)
 		case message := <-h.Message:
 			if client, ok := h.clients[message.receiver]; ok {
 				if client.isRemotely {
-					ctx := context.Background()
-					err := h.rdb.Publish(ctx, messageChannelEvent, message).Err()
-					if err != nil {
-						zlog.Error(err)
-					}
+					h.OnPublishMessage(ctx, message)
 				} else {
 					client.Send <- message
 				}
@@ -96,36 +90,64 @@ func (h *Hub) remotelyEvent() {
 
 		switch rMsg.Channel {
 		case connectChannelEvent:
-			h.Connect <- &Client{
-				hub:        h,
-				UserId:     rMsg.Payload,
-				isRemotely: true,
-				Send:       make(chan *Message),
+			var event = deserializeEvent([]byte(rMsg.Payload))
+			if event.ServerId != h.serverId {
+				h.Connect <- &Client{
+					hub:        h,
+					UserId:     event.UserId,
+					isRemotely: true,
+					Send:       make(chan *Message),
+				}
 			}
 		case disconnectChannelEvent:
-			h.Disconnect <- h.clients[rMsg.Payload]
-		case messageChannelEvent:
-			var message Message
-			if err = json.Unmarshal([]byte(rMsg.Payload), &message); err != nil {
-				zlog.Error(err)
-				break
+			var event = deserializeEvent([]byte(rMsg.Payload))
+			if event.ServerId != h.serverId {
+				h.Disconnect <- h.clients[event.UserId]
 			}
-			h.Message <- &message
+		case messageChannelEvent:
+			h.Message <- deserializeMessage([]byte(rMsg.Payload))
 		}
 		fmt.Println(rMsg)
 	}
 }
 
+func deserializeEvent(bytes []byte) *PublishEvent {
+	var event PublishEvent
+	_ = json.Unmarshal(bytes, &event)
+	return &event
+}
+
+type PublishEvent struct {
+	ServerId string
+	UserId   string
+}
+
+type PublishMessage struct {
+	ServerId string
+	message  *Message
+}
+
 func (h *Hub) OnPublishConnect(ctx context.Context, client *Client) error {
 	if !client.isRemotely {
-		return h.rdb.Publish(ctx, connectChannelEvent, client.UserId).Err()
+		event := &PublishEvent{UserId: client.UserId, ServerId: h.serverId}
+		return h.rdb.Publish(ctx, connectChannelEvent, event).Err()
 	}
 	return nil
 }
 
 func (h *Hub) OnPublishDisconnect(ctx context.Context, client *Client) error {
 	if !client.isRemotely {
-		return h.rdb.Publish(ctx, disconnectChannelEvent, client.UserId).Err()
+		event := &PublishEvent{UserId: client.UserId, ServerId: h.serverId}
+		return h.rdb.Publish(ctx, disconnectChannelEvent, event).Err()
+	}
+	return nil
+}
+
+func (h *Hub) OnPublishMessage(ctx context.Context, message *Message) error {
+	err := h.rdb.Publish(ctx, messageChannelEvent, message).Err()
+	if err != nil {
+		zlog.Error(err)
+		return err
 	}
 	return nil
 }

+ 0 - 98
stackexchange/redis/redis_stackexchange.go

@@ -1,98 +0,0 @@
-package redis
-
-// import (
-// 	"context"
-// 	"fmt"
-
-// 	rdx "github.com/redis/go-redis/v9"
-// 	"sikey.com/websocket/utils/zlog"
-// )
-
-// type StackExchange struct {
-// 	channel string
-
-// 	rdb rdx.UniversalClient
-
-// 	Connect    chan *RelationshipMessage
-// 	Disconnect chan *RelationshipMessage
-// }
-
-// type Config struct {
-// 	ServerId string
-
-// 	Cluster  []string
-// 	Password string
-// 	DB       int
-// }
-
-// func NewStackExchange(cfg Config, channel string) *StackExchange {
-// 	exc := &StackExchange{
-// 		channel: channel,
-// 		rdb: rdx.NewUniversalClient(&rdx.UniversalOptions{
-// 			Addrs:    cfg.Cluster,
-// 			Password: cfg.Password,
-// 			DB:       cfg.DB,
-// 		}),
-// 	}
-// 	go exc.run()
-// 	go exc.receive()
-// 	return exc
-// }
-
-// var (
-// 	// ws:user:clients:${uid}
-// 	// 存储用户和 WebSocket 连接的关系,采用有序集合方式存储
-// 	connectRelationship = "ws:user:clients:%s"
-// )
-
-// type (
-// 	RelationshipMessage struct {
-// 		ServerId string `json:"serverId"`
-// 		UserId   string `json:"userId"`
-// 	}
-// )
-
-// // func (exc *StackExchange) Publish(msgs ...Message) {
-// // }
-
-// func (exc *StackExchange) OnConnect(msg RelationshipMessage) error {
-// 	ctx := context.Background()
-// 	key := fmt.Sprintf(connectRelationship, msg.UserId)
-// 	return exc.rdb.Set(ctx, key, msg.ServerId, 0).Err()
-// }
-
-// func (exc *StackExchange) OnDisconnect(msg RelationshipMessage) error {
-// 	ctx := context.Background()
-// 	key := fmt.Sprintf(connectRelationship, msg.UserId)
-// 	return exc.rdb.Del(ctx, key).Err()
-// }
-
-// func (exc *StackExchange) run() {
-// 	for {
-// 		select {
-// 		case msg := <-exc.Connect:
-// 			exc.rdb.Publish(context.Background(), exc.channel, msg)
-// 		case msg := <-exc.Disconnect:
-// 			exc.rdb.Publish(context.Background(), exc.channel, msg)
-// 		}
-// 	}
-// }
-
-// func (exc *StackExchange) receive() {
-// 	ctx, cancel := context.WithCancel(context.Background())
-// 	pubsub := exc.rdb.PSubscribe(ctx, exc.channel)
-// 	defer func() {
-// 		cancel()
-// 		pubsub.Close()
-// 	}()
-
-// 	for {
-// 		msg, err := pubsub.ReceiveMessage(ctx)
-// 		if err != nil {
-// 			zlog.Error(err)
-// 			cancel()
-// 		}
-
-// 		fmt.Println(msg)
-// 	}
-// }

+ 0 - 92
stackexchange/stackexchange.go

@@ -1,92 +0,0 @@
-package stackexchange
-
-// var (
-// 	StackExchangeOnline    = []byte("exchange_online")    // StackExchangeOnline 上线
-// 	StackExchangeOffline   = []byte("exchange_offline")   // StackExchangeOffline 下线
-// 	StackExchangeMessaging = []byte("exchange_messaging") // StackExchangeMessaging 消息
-// )
-
-// type StackExchange struct {
-// 	Brokers   []string
-// 	Topic     string
-// 	Partition int
-// 	MaxBytes  int
-
-// 	offset int64
-// 	reader *kafka.Reader
-// 	writer *kafka.Writer
-// }
-
-// func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
-// 	// dialer
-// 	dialer := &kafka.Dialer{
-// 		Timeout:   config.Kafka.Timeout * time.Second,
-// 		DualStack: true,
-// 	}
-
-// 	reader := kafka.NewReader(kafka.ReaderConfig{
-// 		Brokers:     brokers,
-// 		Topic:       topic,
-// 		StartOffset: kafka.FirstOffset,
-// 		Dialer:      dialer,
-// 	})
-
-// 	// exchange
-// 	reader.SetOffsetAt(context.TODO(), time.Now())
-// 	return &StackExchange{
-// 		Brokers:   brokers,
-// 		Topic:     topic,
-// 		Partition: partition,
-// 		MaxBytes:  maxBytes,
-
-// 		offset: 0,
-// 		reader: reader,
-// 		writer: &kafka.Writer{
-// 			Addr:                   kafka.TCP(brokers...),
-// 			Topic:                  topic,
-// 			AllowAutoTopicCreation: true,
-// 		},
-// 	}
-// }
-
-// func (exc *StackExchange) OnlineNotify(userId string) error {
-// 	return exc.writer.WriteMessages(context.Background(), kafka.Message{
-// 		Key:   StackExchangeOnline,
-// 		Value: []byte(userId),
-// 	})
-// }
-
-// func (exc *StackExchange) OfflineNotify(userId string) error {
-// 	return exc.writer.WriteMessages(context.Background(), kafka.Message{
-// 		Key:   StackExchangeOffline,
-// 		Value: []byte(userId),
-// 	})
-// }
-
-// func (exc *StackExchange) SendMessage(ctx context.Context, message kafka.Message) error {
-// 	return exc.writer.WriteMessages(ctx, message)
-// }
-
-// func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
-// 	msg, err := exc.reader.ReadMessage(context.Background())
-// 	if err != nil {
-// 		return kafka.Message{}, err
-// 	}
-
-// 	exc.offset = exc.offset + 1
-// 	return msg, nil
-// }
-
-// func (exc *StackExchange) writeMessage(ctx context.Context, msgs ...kafka.Message) error {
-// 	return exc.writer.WriteMessages(ctx, msgs...)
-// }
-
-// func (exec *StackExchange) Offset() int64 {
-// 	return exec.reader.Offset()
-// }
-
-// func (exec *StackExchange) SetOffsetAt() error {
-// 	ctx, cannel := context.WithTimeout(context.Background(), 3*time.Second)
-// 	defer cannel()
-// 	return exec.reader.SetOffsetAt(ctx, time.Now())
-// }

+ 7 - 1
websocket.go

@@ -9,6 +9,7 @@ import (
 	"github.com/gin-gonic/gin"
 	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
+	"github.com/redis/go-redis/v9"
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/server"
@@ -52,7 +53,12 @@ func newApp() *gin.Engine {
 		ReadWait:  10 * time.Second,
 		PingWait:  120 * time.Second,
 		Hub: server.NewHub(server.HubConfig{
-			ServerId:       uuid.NewString(),
+			ServerId: uuid.NewString(),
+			Rdb: redis.NewUniversalClient(&redis.UniversalOptions{
+				Addrs:    []string{"106.75.230.4:6379"},
+				Password: "sikey!Q@W#E456",
+				DB:       0,
+			}),
 			ConnectSize:    1024,
 			DisconnectSize: 1024,
 			MessageSize:    125,