hub.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package server
  2. import (
  3. "sync"
  4. "sikey.com/websocket/utils/zlog"
  5. "github.com/redis/go-redis/v9"
  6. )
  7. type HubConfig struct {
  8. ConnectSize int
  9. DisconnectSize int
  10. MessageSize int
  11. ServerId string // serverId 服务器ID
  12. Rdb redis.UniversalClient
  13. }
  14. type Hub struct {
  15. serverId string // serverId 服务器ID
  16. rdb redis.UniversalClient
  17. clients map[string]*Client
  18. mutex sync.RWMutex
  19. Connect chan *Client
  20. Disconnect chan *Client
  21. Message chan *Message
  22. exchange *Exchange
  23. }
  24. func NewHub(cfg HubConfig) *Hub {
  25. hub := &Hub{
  26. serverId: cfg.ServerId,
  27. clients: make(map[string]*Client),
  28. mutex: sync.RWMutex{},
  29. Connect: make(chan *Client, cfg.ConnectSize),
  30. Disconnect: make(chan *Client, cfg.DisconnectSize),
  31. Message: make(chan *Message, cfg.MessageSize),
  32. }
  33. hub.exchange = NewExchange(cfg.ServerId, cfg.Rdb)
  34. go hub.run()
  35. return hub
  36. }
  37. func (h *Hub) run() {
  38. for {
  39. select {
  40. case client := <-h.Connect:
  41. h.mutex.Lock()
  42. h.clients[client.UserId] = client
  43. h.exchange.OnPublishConnect(client)
  44. h.mutex.Unlock()
  45. case client := <-h.Disconnect:
  46. h.mutex.Lock()
  47. close(client.Send)
  48. delete(h.clients, client.UserId)
  49. h.exchange.OnPublishDisconnect(client)
  50. h.mutex.Unlock()
  51. case message := <-h.Message:
  52. h.mutex.RLock()
  53. if client, ok := h.clients[message.Receiver]; ok {
  54. if client.isRemotely {
  55. h.exchange.OnPublishMessage(client, message)
  56. } else {
  57. zlog.Info("message: ", message)
  58. client.Send <- message
  59. }
  60. }
  61. h.mutex.RUnlock()
  62. case conn := <-h.exchange.Connect:
  63. h.mutex.Lock()
  64. h.clients[conn.UserId] = &Client{
  65. isRemotely: true,
  66. hub: h,
  67. Send: make(chan *Message),
  68. }
  69. h.mutex.Unlock()
  70. case conn := <-h.exchange.Disconnect:
  71. h.mutex.Lock()
  72. if client, ok := h.clients[conn.UserId]; ok {
  73. close(client.Send)
  74. delete(h.clients, client.UserId)
  75. }
  76. h.mutex.Unlock()
  77. case message := <-h.exchange.Message:
  78. h.mutex.RLock()
  79. if client, ok := h.clients[message.Receiver]; ok {
  80. client.Send <- message
  81. }
  82. h.mutex.RUnlock()
  83. }
  84. }
  85. }
  86. func (h *Hub) GetClients() map[string]*Client {
  87. h.mutex.RLock()
  88. defer h.mutex.RUnlock()
  89. return h.clients
  90. }