hub_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package server
  2. import (
  3. "context"
  4. "log"
  5. "testing"
  6. "github.com/redis/go-redis/v9"
  7. "sikey.com/websocket/config"
  8. )
  9. var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
  10. func TestHub_ConnectMessage(t *testing.T) {
  11. config.MustLoadConfig("../etc/websocket.toml")
  12. rdb := redis.NewUniversalClient(&redis.UniversalOptions{
  13. Addrs: []string{config.Redis.Addr},
  14. Password: config.Redis.Password,
  15. DB: config.Redis.Db,
  16. })
  17. var err error
  18. ctx := context.Background()
  19. err = rdb.Publish(ctx, "client.event.connect", userId).Err()
  20. if err != nil {
  21. log.Fatalln(err)
  22. }
  23. err = rdb.Publish(ctx, "client.event.disconnect", userId).Err()
  24. if err != nil {
  25. log.Fatalln(err)
  26. }
  27. }
  28. // func TestHub_ConnectMessage(t *testing.T) {
  29. // writer := &kafka.Writer{
  30. // Addr: kafka.TCP("106.75.230.4:9092"),
  31. // Topic: "messaging-channel",
  32. // AllowAutoTopicCreation: true,
  33. // }
  34. // // 发送上线消息
  35. // message := kafka.Message{
  36. // Key: stackexchange.StackExchangeOnline,
  37. // Value: []byte(userId),
  38. // }
  39. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  40. // log.Fatalln(err)
  41. // }
  42. // }
  43. // func TestHub_DisconnectMessage(t *testing.T) {
  44. // writer := &kafka.Writer{
  45. // Addr: kafka.TCP("106.75.230.4:9092"),
  46. // Topic: "messaging-channel",
  47. // AllowAutoTopicCreation: true,
  48. // }
  49. // // 发送上线消息
  50. // message := kafka.Message{
  51. // Key: stackexchange.StackExchangeOffline,
  52. // Value: []byte(userId),
  53. // }
  54. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  55. // log.Fatalln(err)
  56. // }
  57. // }
  58. // func TestHub_ReaderMessage(t *testing.T) {
  59. // reader := kafka.NewReader(kafka.ReaderConfig{
  60. // Brokers: []string{"106.75.230.4:9092"},
  61. // Topic: "messaging-channel",
  62. // StartOffset: kafka.FirstOffset,
  63. // })
  64. // ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
  65. // defer cannel()
  66. // for {
  67. // select {
  68. // case <-ctx.Done():
  69. // return
  70. // default:
  71. // var err error
  72. // var msg kafka.Message
  73. // if msg, err = reader.ReadMessage(ctx); err != nil {
  74. // fmt.Println(err)
  75. // break
  76. // }
  77. // fmt.Println(msg)
  78. // }
  79. // }
  80. // }
  81. // func TestHub_TopicList(t *testing.T) {
  82. // conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
  83. // if err != nil {
  84. // panic(err.Error())
  85. // }
  86. // defer conn.Close()
  87. // partitions, err := conn.ReadPartitions()
  88. // if err != nil {
  89. // panic(err.Error())
  90. // }
  91. // m := map[string]struct{}{}
  92. // // 遍历所有分区取topic
  93. // for _, p := range partitions {
  94. // m[p.Topic] = struct{}{}
  95. // }
  96. // for k := range m {
  97. // fmt.Println(k)
  98. // }
  99. // }