hub.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package server
  2. import (
  3. "sync"
  4. "sikey.com/websocket/config"
  5. "sikey.com/websocket/utils/zlog"
  6. "github.com/redis/go-redis/v9"
  7. )
  8. type HubConfig struct {
  9. ServerId string // serverId 服务器ID
  10. Rdb redis.UniversalClient
  11. }
  12. type Hub struct {
  13. serverId string // serverId 服务器ID
  14. rdb redis.UniversalClient
  15. clients map[string]*Client
  16. mutex sync.RWMutex
  17. Connect chan *Client
  18. Disconnect chan *Client
  19. Message chan *Message
  20. exchange *Exchange
  21. }
  22. func NewHub(cfg HubConfig) *Hub {
  23. hub := &Hub{
  24. serverId: cfg.ServerId,
  25. clients: make(map[string]*Client),
  26. mutex: sync.RWMutex{},
  27. Connect: make(chan *Client, config.Websocket.ConnectSize),
  28. Disconnect: make(chan *Client, config.Websocket.ConnectSize),
  29. Message: make(chan *Message, config.Websocket.MessageSize),
  30. }
  31. hub.exchange = NewExchange(cfg.ServerId, cfg.Rdb)
  32. go hub.run()
  33. return hub
  34. }
  35. func (h *Hub) run() {
  36. for {
  37. select {
  38. case client := <-h.Connect:
  39. h.mutex.Lock()
  40. h.clients[client.UserId] = client
  41. // h.exchange.OnPublishConnect(client)
  42. h.mutex.Unlock()
  43. case client := <-h.Disconnect:
  44. h.mutex.Lock()
  45. close(client.Send)
  46. delete(h.clients, client.UserId)
  47. // h.exchange.OnPublishDisconnect(client)
  48. h.mutex.Unlock()
  49. case message := <-h.Message:
  50. h.mutex.RLock()
  51. if client, ok := h.clients[message.Receiver]; ok {
  52. if client.isRemotely {
  53. h.exchange.OnPublishMessage(client, message)
  54. } else {
  55. zlog.Info("message: ", message)
  56. client.Send <- message
  57. }
  58. }
  59. h.mutex.RUnlock()
  60. case conn := <-h.exchange.Connect:
  61. h.mutex.Lock()
  62. h.clients[conn.UserId] = &Client{
  63. isRemotely: true,
  64. hub: h,
  65. Send: make(chan *Message),
  66. }
  67. h.mutex.Unlock()
  68. case conn := <-h.exchange.Disconnect:
  69. h.mutex.Lock()
  70. if client, ok := h.clients[conn.UserId]; ok {
  71. close(client.Send)
  72. delete(h.clients, client.UserId)
  73. }
  74. h.mutex.Unlock()
  75. case message := <-h.exchange.Message:
  76. h.mutex.RLock()
  77. if client, ok := h.clients[message.Receiver]; ok {
  78. client.Send <- message
  79. }
  80. h.mutex.RUnlock()
  81. }
  82. }
  83. }
  84. func (h *Hub) GetClients() map[string]*Client {
  85. return h.clients
  86. }