hub_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package server
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "testing"
  7. "github.com/gin-gonic/gin"
  8. "github.com/redis/go-redis/v9"
  9. "sikey.com/websocket/config"
  10. "sikey.com/websocket/repositories"
  11. "sikey.com/websocket/utils/mysqlx"
  12. )
  13. var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
  14. func TestHub_ConnectMessage(t *testing.T) {
  15. config.MustLoadConfig("../etc/websocket.toml")
  16. rdb := redis.NewUniversalClient(&redis.UniversalOptions{
  17. Addrs: []string{config.Redis.Addr},
  18. Password: config.Redis.Password,
  19. DB: config.Redis.Db,
  20. })
  21. var err error
  22. ctx := context.Background()
  23. err = rdb.Publish(ctx, "client.event.connect", userId).Err()
  24. if err != nil {
  25. log.Fatalln(err)
  26. }
  27. err = rdb.Publish(ctx, "client.event.disconnect", userId).Err()
  28. if err != nil {
  29. log.Fatalln(err)
  30. }
  31. }
  32. func TestHub_getReceiverUserIds(t *testing.T) {
  33. config.MustLoadConfig("../etc/websocket.toml")
  34. repos := repositories.NewRepositories(mysqlx.Connect(), nil)
  35. c := &Client{
  36. ctx: &gin.Context{},
  37. UserId: "beaf8878-03d8-4bf6-a783-22a0d4881265",
  38. repos: repos,
  39. }
  40. users := c.getReceiverUserIds("1753371851436216320")
  41. fmt.Println(users)
  42. }
  43. // func TestHub_ConnectMessage(t *testing.T) {
  44. // writer := &kafka.Writer{
  45. // Addr: kafka.TCP("106.75.230.4:9092"),
  46. // Topic: "messaging-channel",
  47. // AllowAutoTopicCreation: true,
  48. // }
  49. // // 发送上线消息
  50. // message := kafka.Message{
  51. // Key: stackexchange.StackExchangeOnline,
  52. // Value: []byte(userId),
  53. // }
  54. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  55. // log.Fatalln(err)
  56. // }
  57. // }
  58. // func TestHub_DisconnectMessage(t *testing.T) {
  59. // writer := &kafka.Writer{
  60. // Addr: kafka.TCP("106.75.230.4:9092"),
  61. // Topic: "messaging-channel",
  62. // AllowAutoTopicCreation: true,
  63. // }
  64. // // 发送上线消息
  65. // message := kafka.Message{
  66. // Key: stackexchange.StackExchangeOffline,
  67. // Value: []byte(userId),
  68. // }
  69. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  70. // log.Fatalln(err)
  71. // }
  72. // }
  73. // func TestHub_ReaderMessage(t *testing.T) {
  74. // reader := kafka.NewReader(kafka.ReaderConfig{
  75. // Brokers: []string{"106.75.230.4:9092"},
  76. // Topic: "messaging-channel",
  77. // StartOffset: kafka.FirstOffset,
  78. // })
  79. // ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
  80. // defer cannel()
  81. // for {
  82. // select {
  83. // case <-ctx.Done():
  84. // return
  85. // default:
  86. // var err error
  87. // var msg kafka.Message
  88. // if msg, err = reader.ReadMessage(ctx); err != nil {
  89. // fmt.Println(err)
  90. // break
  91. // }
  92. // fmt.Println(msg)
  93. // }
  94. // }
  95. // }
  96. // func TestHub_TopicList(t *testing.T) {
  97. // conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
  98. // if err != nil {
  99. // panic(err.Error())
  100. // }
  101. // defer conn.Close()
  102. // partitions, err := conn.ReadPartitions()
  103. // if err != nil {
  104. // panic(err.Error())
  105. // }
  106. // m := map[string]struct{}{}
  107. // // 遍历所有分区取topic
  108. // for _, p := range partitions {
  109. // m[p.Topic] = struct{}{}
  110. // }
  111. // for k := range m {
  112. // fmt.Println(k)
  113. // }
  114. // }