package server import ( "fmt" "sync" "sikey.com/websocket/config" "sikey.com/websocket/utils/zlog" ) type Hub struct { serverId string // serverId 服务器ID clients map[string]*Client mutex sync.RWMutex Connect chan *Client Disconnect chan *Client Message chan *Message FirebaseMessage chan *FirebaseMessage Nats *Nats } func NewHub(serverId string) *Hub { hub := &Hub{ serverId: serverId, clients: make(map[string]*Client), mutex: sync.RWMutex{}, Connect: make(chan *Client, config.Websocket.ConnectSize), Disconnect: make(chan *Client, config.Websocket.ConnectSize), Message: make(chan *Message, config.Websocket.MessageSize), Nats: NewNats(config.Websocket.NatsUrl), } go hub.run() return hub } func (h *Hub) run() { for { select { case client := <-h.Connect: h.mutex.Lock() h.clients[client.UserId] = client h.mutex.Unlock() if h.Nats != nil { h.Nats.Subscribe <- &subscriber{client: client} } case client := <-h.Disconnect: h.mutex.Lock() close(client.Send) delete(h.clients, client.UserId) h.mutex.Unlock() if h.Nats != nil { h.Nats.Unsubscribe <- &subscriber{client: client} } case message := <-h.Message: h.mutex.RLock() if client, ok := h.clients[message.Receiver]; ok { zlog.Info("message: ", message) client.Send <- message } else { h.Nats.Send <- &natsMessage{userId: message.Receiver, message: message} } h.mutex.RUnlock() case firebaseMessage := <-h.FirebaseMessage: fmt.Println(firebaseMessage) } } } func (h *Hub) GetClients() []*Client { h.mutex.RLock() defer h.mutex.RUnlock() var clients = make([]*Client, 0) for _, c := range h.clients { clients = append(clients, c) } return clients }