hub_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package server
  2. import (
  3. "fmt"
  4. "testing"
  5. "github.com/gin-gonic/gin"
  6. "sikey.com/websocket/config"
  7. "sikey.com/websocket/repositories"
  8. "x.sikey.com.cn/serverx/dbx"
  9. )
  10. var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
  11. func TestHub_getReceiverUserIds(t *testing.T) {
  12. config.MustLoadConfig("../etc/websocket.toml")
  13. repos := repositories.NewRepositories(dbx.Connect(), nil)
  14. c := &Client{
  15. ctx: &gin.Context{},
  16. UserId: "beaf8878-03d8-4bf6-a783-22a0d4881265",
  17. repos: repos,
  18. }
  19. users := c.getReceiverUserIds("1760125639325011968")
  20. fmt.Println(users)
  21. }
  22. // func TestHub_ConnectMessage(t *testing.T) {
  23. // writer := &kafka.Writer{
  24. // Addr: kafka.TCP("106.75.230.4:9092"),
  25. // Topic: "messaging-channel",
  26. // AllowAutoTopicCreation: true,
  27. // }
  28. // // 发送上线消息
  29. // message := kafka.Message{
  30. // Key: stackexchange.StackExchangeOnline,
  31. // Value: []byte(userId),
  32. // }
  33. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  34. // log.Fatalln(err)
  35. // }
  36. // }
  37. // func TestHub_DisconnectMessage(t *testing.T) {
  38. // writer := &kafka.Writer{
  39. // Addr: kafka.TCP("106.75.230.4:9092"),
  40. // Topic: "messaging-channel",
  41. // AllowAutoTopicCreation: true,
  42. // }
  43. // // 发送上线消息
  44. // message := kafka.Message{
  45. // Key: stackexchange.StackExchangeOffline,
  46. // Value: []byte(userId),
  47. // }
  48. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  49. // log.Fatalln(err)
  50. // }
  51. // }
  52. // func TestHub_ReaderMessage(t *testing.T) {
  53. // reader := kafka.NewReader(kafka.ReaderConfig{
  54. // Brokers: []string{"106.75.230.4:9092"},
  55. // Topic: "messaging-channel",
  56. // StartOffset: kafka.FirstOffset,
  57. // })
  58. // ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
  59. // defer cannel()
  60. // for {
  61. // select {
  62. // case <-ctx.Done():
  63. // return
  64. // default:
  65. // var err error
  66. // var msg kafka.Message
  67. // if msg, err = reader.ReadMessage(ctx); err != nil {
  68. // fmt.Println(err)
  69. // break
  70. // }
  71. // fmt.Println(msg)
  72. // }
  73. // }
  74. // }
  75. // func TestHub_TopicList(t *testing.T) {
  76. // conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
  77. // if err != nil {
  78. // panic(err.Error())
  79. // }
  80. // defer conn.Close()
  81. // partitions, err := conn.ReadPartitions()
  82. // if err != nil {
  83. // panic(err.Error())
  84. // }
  85. // m := map[string]struct{}{}
  86. // // 遍历所有分区取topic
  87. // for _, p := range partitions {
  88. // m[p.Topic] = struct{}{}
  89. // }
  90. // for k := range m {
  91. // fmt.Println(k)
  92. // }
  93. // }