hub.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package server
  2. import (
  3. "sync"
  4. "sikey.com/websocket/config"
  5. "sikey.com/websocket/utils/zlog"
  6. "github.com/redis/go-redis/v9"
  7. )
  8. type HubConfig struct {
  9. ServerId string // serverId 服务器ID
  10. Rdb redis.UniversalClient
  11. }
  12. type Hub struct {
  13. serverId string // serverId 服务器ID
  14. rdb redis.UniversalClient
  15. clients map[string]*Client
  16. mutex sync.RWMutex
  17. Connect chan *Client
  18. Disconnect chan *Client
  19. Message chan *Message
  20. exchange *Exchange
  21. }
  22. func NewHub(cfg HubConfig) *Hub {
  23. hub := &Hub{
  24. serverId: cfg.ServerId,
  25. clients: make(map[string]*Client),
  26. mutex: sync.RWMutex{},
  27. Connect: make(chan *Client, config.Websocket.ConnectSize),
  28. Disconnect: make(chan *Client, config.Websocket.ConnectSize),
  29. Message: make(chan *Message, config.Websocket.MessageSize),
  30. }
  31. hub.exchange = NewExchange(cfg.ServerId, cfg.Rdb)
  32. go hub.run()
  33. return hub
  34. }
  35. func (h *Hub) run() {
  36. for {
  37. select {
  38. case client := <-h.Connect:
  39. h.mutex.Lock()
  40. h.clients[client.UserId] = client
  41. h.mutex.Unlock()
  42. case client := <-h.Disconnect:
  43. h.mutex.Lock()
  44. close(client.Send)
  45. delete(h.clients, client.UserId)
  46. h.mutex.Unlock()
  47. case message := <-h.Message:
  48. h.mutex.RLock()
  49. if client, ok := h.clients[message.Receiver]; ok {
  50. if client.isRemotely {
  51. // h.exchange.OnPublishMessage(client, message)
  52. } else {
  53. zlog.Info("message: ", message)
  54. client.Send <- message
  55. }
  56. }
  57. h.mutex.RUnlock()
  58. // case conn := <-h.exchange.Connect:
  59. // h.mutex.Lock()
  60. // h.clients[conn.UserId] = &Client{
  61. // isRemotely: true,
  62. // hub: h,
  63. // Send: make(chan *Message),
  64. // }
  65. // h.mutex.Unlock()
  66. // case conn := <-h.exchange.Disconnect:
  67. // h.mutex.Lock()
  68. // if client, ok := h.clients[conn.UserId]; ok {
  69. // close(client.Send)
  70. // delete(h.clients, client.UserId)
  71. // }
  72. // h.mutex.Unlock()
  73. // case message := <-h.exchange.Message:
  74. // h.mutex.RLock()
  75. // if client, ok := h.clients[message.Receiver]; ok {
  76. // client.Send <- message
  77. // }
  78. // h.mutex.RUnlock()
  79. }
  80. }
  81. }