hub.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "github.com/redis/go-redis/v9"
  8. "sikey.com/websocket/utils/zlog"
  9. )
  10. type HubConfig struct {
  11. ConnectSize int
  12. DisconnectSize int
  13. MessageSize int
  14. ServerId string // serverId 服务器ID
  15. Rdb redis.UniversalClient
  16. }
  17. type Hub struct {
  18. serverId string // serverId 服务器ID
  19. rdb redis.UniversalClient
  20. clients map[string]*Client
  21. mutex sync.RWMutex
  22. Connect chan *Client
  23. Disconnect chan *Client
  24. Message chan *Message
  25. }
  26. func NewHub(cfg HubConfig) *Hub {
  27. hub := &Hub{
  28. serverId: cfg.ServerId,
  29. rdb: cfg.Rdb,
  30. clients: make(map[string]*Client),
  31. mutex: sync.RWMutex{},
  32. Connect: make(chan *Client, cfg.ConnectSize),
  33. Disconnect: make(chan *Client, cfg.DisconnectSize),
  34. Message: make(chan *Message, cfg.MessageSize),
  35. }
  36. go hub.run()
  37. go hub.remotelyEvent() // 远程事件
  38. return hub
  39. }
  40. func (h *Hub) run() {
  41. for {
  42. ctx := context.Background()
  43. select {
  44. case client := <-h.Connect:
  45. h.clients[client.UserId] = client
  46. h.OnPublishConnect(ctx, client)
  47. case client := <-h.Disconnect:
  48. close(client.Send)
  49. delete(h.clients, client.UserId)
  50. h.OnPublishDisconnect(ctx, client)
  51. case message := <-h.Message:
  52. if client, ok := h.clients[message.receiver]; ok {
  53. if client.isRemotely {
  54. h.OnPublishMessage(ctx, message)
  55. } else {
  56. client.Send <- message
  57. }
  58. }
  59. }
  60. }
  61. }
  62. var (
  63. connectChannelEvent = "client.event.connect"
  64. disconnectChannelEvent = "client.event.disconnect"
  65. messageChannelEvent = "client.event.message"
  66. )
  67. func (h *Hub) remotelyEvent() {
  68. ctx := context.TODO()
  69. pubsub := h.rdb.PSubscribe(ctx, "client.event.*")
  70. defer pubsub.Close()
  71. for {
  72. rMsg, err := pubsub.ReceiveMessage(ctx)
  73. if err != nil {
  74. zlog.Error(err)
  75. }
  76. switch rMsg.Channel {
  77. case connectChannelEvent:
  78. var event = deserializeEvent([]byte(rMsg.Payload))
  79. if event.ServerId != h.serverId {
  80. h.Connect <- &Client{
  81. hub: h,
  82. UserId: event.UserId,
  83. isRemotely: true,
  84. Send: make(chan *Message),
  85. }
  86. }
  87. case disconnectChannelEvent:
  88. var event = deserializeEvent([]byte(rMsg.Payload))
  89. if event.ServerId != h.serverId {
  90. h.Disconnect <- h.clients[event.UserId]
  91. }
  92. case messageChannelEvent:
  93. h.Message <- deserializeMessage([]byte(rMsg.Payload))
  94. }
  95. fmt.Println(rMsg)
  96. }
  97. }
  98. func deserializeEvent(bytes []byte) *PublishEvent {
  99. var event PublishEvent
  100. _ = json.Unmarshal(bytes, &event)
  101. return &event
  102. }
  103. type PublishEvent struct {
  104. ServerId string
  105. UserId string
  106. }
  107. type PublishMessage struct {
  108. ServerId string
  109. message *Message
  110. }
  111. func (h *Hub) OnPublishConnect(ctx context.Context, client *Client) error {
  112. if !client.isRemotely {
  113. event := &PublishEvent{UserId: client.UserId, ServerId: h.serverId}
  114. return h.rdb.Publish(ctx, connectChannelEvent, event).Err()
  115. }
  116. return nil
  117. }
  118. func (h *Hub) OnPublishDisconnect(ctx context.Context, client *Client) error {
  119. if !client.isRemotely {
  120. event := &PublishEvent{UserId: client.UserId, ServerId: h.serverId}
  121. return h.rdb.Publish(ctx, disconnectChannelEvent, event).Err()
  122. }
  123. return nil
  124. }
  125. func (h *Hub) OnPublishMessage(ctx context.Context, message *Message) error {
  126. err := h.rdb.Publish(ctx, messageChannelEvent, message).Err()
  127. if err != nil {
  128. zlog.Error(err)
  129. return err
  130. }
  131. return nil
  132. }