hub_test.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package server
  2. var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
  3. // func TestHub_ConnectMessage(t *testing.T) {
  4. // writer := &kafka.Writer{
  5. // Addr: kafka.TCP("106.75.230.4:9092"),
  6. // Topic: "messaging-channel",
  7. // AllowAutoTopicCreation: true,
  8. // }
  9. // // 发送上线消息
  10. // message := kafka.Message{
  11. // Key: stackexchange.StackExchangeOnline,
  12. // Value: []byte(userId),
  13. // }
  14. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  15. // log.Fatalln(err)
  16. // }
  17. // }
  18. // func TestHub_DisconnectMessage(t *testing.T) {
  19. // writer := &kafka.Writer{
  20. // Addr: kafka.TCP("106.75.230.4:9092"),
  21. // Topic: "messaging-channel",
  22. // AllowAutoTopicCreation: true,
  23. // }
  24. // // 发送上线消息
  25. // message := kafka.Message{
  26. // Key: stackexchange.StackExchangeOffline,
  27. // Value: []byte(userId),
  28. // }
  29. // if err := writer.WriteMessages(context.TODO(), message); err != nil {
  30. // log.Fatalln(err)
  31. // }
  32. // }
  33. // func TestHub_ReaderMessage(t *testing.T) {
  34. // reader := kafka.NewReader(kafka.ReaderConfig{
  35. // Brokers: []string{"106.75.230.4:9092"},
  36. // Topic: "messaging-channel",
  37. // StartOffset: kafka.FirstOffset,
  38. // })
  39. // ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
  40. // defer cannel()
  41. // for {
  42. // select {
  43. // case <-ctx.Done():
  44. // return
  45. // default:
  46. // var err error
  47. // var msg kafka.Message
  48. // if msg, err = reader.ReadMessage(ctx); err != nil {
  49. // fmt.Println(err)
  50. // break
  51. // }
  52. // fmt.Println(msg)
  53. // }
  54. // }
  55. // }
  56. // func TestHub_TopicList(t *testing.T) {
  57. // conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
  58. // if err != nil {
  59. // panic(err.Error())
  60. // }
  61. // defer conn.Close()
  62. // partitions, err := conn.ReadPartitions()
  63. // if err != nil {
  64. // panic(err.Error())
  65. // }
  66. // m := map[string]struct{}{}
  67. // // 遍历所有分区取topic
  68. // for _, p := range partitions {
  69. // m[p.Topic] = struct{}{}
  70. // }
  71. // for k := range m {
  72. // fmt.Println(k)
  73. // }
  74. // }