123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- package server
- import (
- "context"
- "log"
- "testing"
- "github.com/redis/go-redis/v9"
- "sikey.com/websocket/config"
- )
- var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
- func TestHub_ConnectMessage(t *testing.T) {
- config.MustLoadConfig("../etc/websocket.toml")
- rdb := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: []string{config.Redis.Addr},
- Password: config.Redis.Password,
- DB: config.Redis.Db,
- })
- var err error
- ctx := context.Background()
- err = rdb.Publish(ctx, "client.event.connect", userId).Err()
- if err != nil {
- log.Fatalln(err)
- }
- err = rdb.Publish(ctx, "client.event.disconnect", userId).Err()
- if err != nil {
- log.Fatalln(err)
- }
- }
- // func TestHub_ConnectMessage(t *testing.T) {
- // writer := &kafka.Writer{
- // Addr: kafka.TCP("106.75.230.4:9092"),
- // Topic: "messaging-channel",
- // AllowAutoTopicCreation: true,
- // }
- // // 发送上线消息
- // message := kafka.Message{
- // Key: stackexchange.StackExchangeOnline,
- // Value: []byte(userId),
- // }
- // if err := writer.WriteMessages(context.TODO(), message); err != nil {
- // log.Fatalln(err)
- // }
- // }
- // func TestHub_DisconnectMessage(t *testing.T) {
- // writer := &kafka.Writer{
- // Addr: kafka.TCP("106.75.230.4:9092"),
- // Topic: "messaging-channel",
- // AllowAutoTopicCreation: true,
- // }
- // // 发送上线消息
- // message := kafka.Message{
- // Key: stackexchange.StackExchangeOffline,
- // Value: []byte(userId),
- // }
- // if err := writer.WriteMessages(context.TODO(), message); err != nil {
- // log.Fatalln(err)
- // }
- // }
- // func TestHub_ReaderMessage(t *testing.T) {
- // reader := kafka.NewReader(kafka.ReaderConfig{
- // Brokers: []string{"106.75.230.4:9092"},
- // Topic: "messaging-channel",
- // StartOffset: kafka.FirstOffset,
- // })
- // ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
- // defer cannel()
- // for {
- // select {
- // case <-ctx.Done():
- // return
- // default:
- // var err error
- // var msg kafka.Message
- // if msg, err = reader.ReadMessage(ctx); err != nil {
- // fmt.Println(err)
- // break
- // }
- // fmt.Println(msg)
- // }
- // }
- // }
- // func TestHub_TopicList(t *testing.T) {
- // conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
- // if err != nil {
- // panic(err.Error())
- // }
- // defer conn.Close()
- // partitions, err := conn.ReadPartitions()
- // if err != nil {
- // panic(err.Error())
- // }
- // m := map[string]struct{}{}
- // // 遍历所有分区取topic
- // for _, p := range partitions {
- // m[p.Topic] = struct{}{}
- // }
- // for k := range m {
- // fmt.Println(k)
- // }
- // }
|