hub.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "github.com/redis/go-redis/v9"
  8. "sikey.com/websocket/utils/zlog"
  9. )
  10. type HubConfig struct {
  11. ConnectSize int
  12. DisconnectSize int
  13. MessageSize int
  14. ServerId string // serverId 服务器ID
  15. }
  16. type Hub struct {
  17. serverId string // serverId 服务器ID
  18. rdb redis.UniversalClient
  19. clients map[string]*Client
  20. mutex sync.RWMutex
  21. Connect chan *Client
  22. Disconnect chan *Client
  23. Message chan *Message
  24. }
  25. func NewHub(cfg HubConfig) *Hub {
  26. hub := &Hub{
  27. serverId: cfg.ServerId,
  28. rdb: redis.NewUniversalClient(&redis.UniversalOptions{
  29. Addrs: []string{"106.75.230.4:6379"},
  30. Password: "sikey!Q@W#E456",
  31. DB: 0,
  32. }),
  33. clients: make(map[string]*Client),
  34. mutex: sync.RWMutex{},
  35. Connect: make(chan *Client, cfg.ConnectSize),
  36. Disconnect: make(chan *Client, cfg.DisconnectSize),
  37. Message: make(chan *Message, cfg.MessageSize),
  38. }
  39. go hub.run()
  40. go hub.remotelyEvent() // 远程事件
  41. //go hub.remotelyMessageReader()
  42. //go hub.remotelyMessageWriter()
  43. return hub
  44. }
  45. func (h *Hub) run() {
  46. for {
  47. select {
  48. case client := <-h.Connect:
  49. h.clients[client.UserId] = client
  50. case client := <-h.Disconnect:
  51. close(client.Send)
  52. delete(h.clients, client.UserId)
  53. case message := <-h.Message:
  54. if client, ok := h.clients[message.receiver]; ok {
  55. if client.isRemotely {
  56. ctx := context.Background()
  57. err := h.rdb.Publish(ctx, messageChannelEvent, message).Err()
  58. if err != nil {
  59. zlog.Error(err)
  60. }
  61. } else {
  62. client.Send <- message
  63. }
  64. }
  65. }
  66. }
  67. }
  68. var (
  69. connectChannelEvent = "client.event.connect"
  70. disconnectChannelEvent = "client.event.disconnect"
  71. messageChannelEvent = "client.event.message"
  72. )
  73. func (h *Hub) remotelyEvent() {
  74. ctx := context.TODO()
  75. pubsub := h.rdb.PSubscribe(ctx, "client.event.*")
  76. defer pubsub.Close()
  77. for {
  78. rMsg, err := pubsub.ReceiveMessage(ctx)
  79. if err != nil {
  80. zlog.Error(err)
  81. }
  82. switch rMsg.Channel {
  83. case connectChannelEvent:
  84. h.Connect <- &Client{
  85. hub: h,
  86. UserId: rMsg.Payload,
  87. isRemotely: true,
  88. Send: make(chan *Message),
  89. }
  90. case disconnectChannelEvent:
  91. h.Disconnect <- h.clients[rMsg.Payload]
  92. case messageChannelEvent:
  93. var message Message
  94. if err = json.Unmarshal([]byte(rMsg.Payload), &message); err != nil {
  95. zlog.Error(err)
  96. break
  97. }
  98. h.Message <- &message
  99. }
  100. fmt.Println(rMsg)
  101. }
  102. }