hub.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package server
  2. import (
  3. "sync"
  4. "sikey.com/websocket/config"
  5. "sikey.com/websocket/utils/zlog"
  6. )
  7. type Hub struct {
  8. serverId string // serverId 服务器ID
  9. clients map[string]*Client
  10. mutex sync.RWMutex
  11. Connect chan *Client
  12. Disconnect chan *Client
  13. Message chan *Message
  14. }
  15. func NewHub(serverId string) *Hub {
  16. hub := &Hub{
  17. serverId: serverId,
  18. clients: make(map[string]*Client),
  19. mutex: sync.RWMutex{},
  20. Connect: make(chan *Client, config.Websocket.ConnectSize),
  21. Disconnect: make(chan *Client, config.Websocket.ConnectSize),
  22. Message: make(chan *Message, config.Websocket.MessageSize),
  23. }
  24. go hub.run()
  25. return hub
  26. }
  27. func (h *Hub) run() {
  28. for {
  29. select {
  30. case client := <-h.Connect:
  31. h.mutex.Lock()
  32. h.clients[client.UserId] = client
  33. h.mutex.Unlock()
  34. case client := <-h.Disconnect:
  35. h.mutex.Lock()
  36. close(client.Send)
  37. delete(h.clients, client.UserId)
  38. h.mutex.Unlock()
  39. case message := <-h.Message:
  40. h.mutex.RLock()
  41. if client, ok := h.clients[message.Receiver]; ok {
  42. if client.isRemotely {
  43. // h.exchange.OnPublishMessage(client, message)
  44. } else {
  45. zlog.Info("message: ", message)
  46. client.Send <- message
  47. }
  48. }
  49. h.mutex.RUnlock()
  50. // case conn := <-h.exchange.Connect:
  51. // h.mutex.Lock()
  52. // h.clients[conn.UserId] = &Client{
  53. // isRemotely: true,
  54. // hub: h,
  55. // Send: make(chan *Message),
  56. // }
  57. // h.mutex.Unlock()
  58. // case conn := <-h.exchange.Disconnect:
  59. // h.mutex.Lock()
  60. // if client, ok := h.clients[conn.UserId]; ok {
  61. // close(client.Send)
  62. // delete(h.clients, client.UserId)
  63. // }
  64. // h.mutex.Unlock()
  65. // case message := <-h.exchange.Message:
  66. // h.mutex.RLock()
  67. // if client, ok := h.clients[message.Receiver]; ok {
  68. // client.Send <- message
  69. // }
  70. // h.mutex.RUnlock()
  71. }
  72. }
  73. }