hub.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package server
  2. import (
  3. "sync"
  4. "sikey.com/websocket/stackexchange/redis"
  5. "sikey.com/websocket/utils/zlog"
  6. )
  7. type HubConfig struct {
  8. ConnectSize int
  9. DisconnectSize int
  10. MessageSize int
  11. // StackExchange *stackexchange.RedisStackExchange
  12. }
  13. type Hub struct {
  14. remotelyClients map[string]*Client
  15. remotelyMutex sync.RWMutex
  16. clients map[string]*Client
  17. mutex sync.RWMutex
  18. Connect chan *Client
  19. Disconnect chan *Client
  20. Message chan *Message
  21. StackExchange *redis.StackExchange
  22. }
  23. func NewHub(cfg HubConfig) *Hub {
  24. hub := &Hub{
  25. remotelyClients: make(map[string]*Client),
  26. remotelyMutex: sync.RWMutex{},
  27. clients: make(map[string]*Client),
  28. mutex: sync.RWMutex{},
  29. Connect: make(chan *Client, cfg.ConnectSize),
  30. Disconnect: make(chan *Client, cfg.DisconnectSize),
  31. Message: make(chan *Message, cfg.MessageSize),
  32. }
  33. hub.StackExchange = redis.NewStackExchange()
  34. go hub.run()
  35. go hub.remotely() // 远程消息
  36. return hub
  37. }
  38. func (h *Hub) run() {
  39. for {
  40. select {
  41. case client := <-h.Connect:
  42. if client.isRemotely {
  43. h.remotelyClients[client.UserId] = client
  44. } else {
  45. h.clients[client.UserId] = client
  46. }
  47. case client := <-h.Disconnect:
  48. close(client.Send)
  49. if client.isRemotely {
  50. delete(h.remotelyClients, client.UserId)
  51. } else {
  52. delete(h.clients, client.UserId)
  53. }
  54. case message := <-h.Message:
  55. if client, ok := h.clients[message.receiver]; ok {
  56. client.Send <- message
  57. } else {
  58. // 不在同一台服务器, 这里将消息发送至拓展应用
  59. if client != nil {
  60. client.RemotelyMessage <- message
  61. }
  62. }
  63. }
  64. }
  65. }
  66. func (h *Hub) remotely() {
  67. for {
  68. zlog.Info("Remotely client actions monitoring.")
  69. // km, err := h.exchange.ReadMessage()
  70. // if err != nil {
  71. // zlog.Error(err)
  72. // }
  73. // // Create new a remote client
  74. // remotelyClient := &Client{
  75. // isRemotely: true, // mark remotely client
  76. // UserId: string(km.Value),
  77. // Send: make(chan *Message, 256),
  78. // hub: h,
  79. // }
  80. // switch {
  81. // case bytes.Equal(km.Key, stackexchange.StackExchangeOnline):
  82. // h.Connect <- remotelyClient
  83. // zlog.Info("Remotely client online: ", remotelyClient.UserId)
  84. // case bytes.Equal(km.Key, stackexchange.StackExchangeOffline):
  85. // h.Disconnect <- remotelyClient
  86. // zlog.Info("Remotely client offline: ", remotelyClient.UserId)
  87. // case bytes.Equal(km.Key, stackexchange.StackExchangeMessaging):
  88. // h.Message <- deserializeMessage(km.Value)
  89. // }
  90. }
  91. }