redis_stackexchange.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package redis
  2. // import (
  3. // "context"
  4. // "fmt"
  5. // rdx "github.com/redis/go-redis/v9"
  6. // "sikey.com/websocket/utils/zlog"
  7. // )
  8. // type StackExchange struct {
  9. // channel string
  10. // rdb rdx.UniversalClient
  11. // Connect chan *RelationshipMessage
  12. // Disconnect chan *RelationshipMessage
  13. // }
  14. // type Config struct {
  15. // ServerId string
  16. // Cluster []string
  17. // Password string
  18. // DB int
  19. // }
  20. // func NewStackExchange(cfg Config, channel string) *StackExchange {
  21. // exc := &StackExchange{
  22. // channel: channel,
  23. // rdb: rdx.NewUniversalClient(&rdx.UniversalOptions{
  24. // Addrs: cfg.Cluster,
  25. // Password: cfg.Password,
  26. // DB: cfg.DB,
  27. // }),
  28. // }
  29. // go exc.run()
  30. // go exc.receive()
  31. // return exc
  32. // }
  33. // var (
  34. // // ws:user:clients:${uid}
  35. // // 存储用户和 WebSocket 连接的关系,采用有序集合方式存储
  36. // connectRelationship = "ws:user:clients:%s"
  37. // )
  38. // type (
  39. // RelationshipMessage struct {
  40. // ServerId string `json:"serverId"`
  41. // UserId string `json:"userId"`
  42. // }
  43. // )
  44. // // func (exc *StackExchange) Publish(msgs ...Message) {
  45. // // }
  46. // func (exc *StackExchange) OnConnect(msg RelationshipMessage) error {
  47. // ctx := context.Background()
  48. // key := fmt.Sprintf(connectRelationship, msg.UserId)
  49. // return exc.rdb.Set(ctx, key, msg.ServerId, 0).Err()
  50. // }
  51. // func (exc *StackExchange) OnDisconnect(msg RelationshipMessage) error {
  52. // ctx := context.Background()
  53. // key := fmt.Sprintf(connectRelationship, msg.UserId)
  54. // return exc.rdb.Del(ctx, key).Err()
  55. // }
  56. // func (exc *StackExchange) run() {
  57. // for {
  58. // select {
  59. // case msg := <-exc.Connect:
  60. // exc.rdb.Publish(context.Background(), exc.channel, msg)
  61. // case msg := <-exc.Disconnect:
  62. // exc.rdb.Publish(context.Background(), exc.channel, msg)
  63. // }
  64. // }
  65. // }
  66. // func (exc *StackExchange) receive() {
  67. // ctx, cancel := context.WithCancel(context.Background())
  68. // pubsub := exc.rdb.PSubscribe(ctx, exc.channel)
  69. // defer func() {
  70. // cancel()
  71. // pubsub.Close()
  72. // }()
  73. // for {
  74. // msg, err := pubsub.ReceiveMessage(ctx)
  75. // if err != nil {
  76. // zlog.Error(err)
  77. // cancel()
  78. // }
  79. // fmt.Println(msg)
  80. // }
  81. // }