stackexchange.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package stackexchange
  2. // var (
  3. // StackExchangeOnline = []byte("exchange_online") // StackExchangeOnline 上线
  4. // StackExchangeOffline = []byte("exchange_offline") // StackExchangeOffline 下线
  5. // StackExchangeMessaging = []byte("exchange_messaging") // StackExchangeMessaging 消息
  6. // )
  7. // type StackExchange struct {
  8. // Brokers []string
  9. // Topic string
  10. // Partition int
  11. // MaxBytes int
  12. // offset int64
  13. // reader *kafka.Reader
  14. // writer *kafka.Writer
  15. // }
  16. // func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
  17. // // dialer
  18. // dialer := &kafka.Dialer{
  19. // Timeout: config.Kafka.Timeout * time.Second,
  20. // DualStack: true,
  21. // }
  22. // reader := kafka.NewReader(kafka.ReaderConfig{
  23. // Brokers: brokers,
  24. // Topic: topic,
  25. // StartOffset: kafka.FirstOffset,
  26. // Dialer: dialer,
  27. // })
  28. // // exchange
  29. // reader.SetOffsetAt(context.TODO(), time.Now())
  30. // return &StackExchange{
  31. // Brokers: brokers,
  32. // Topic: topic,
  33. // Partition: partition,
  34. // MaxBytes: maxBytes,
  35. // offset: 0,
  36. // reader: reader,
  37. // writer: &kafka.Writer{
  38. // Addr: kafka.TCP(brokers...),
  39. // Topic: topic,
  40. // AllowAutoTopicCreation: true,
  41. // },
  42. // }
  43. // }
  44. // func (exc *StackExchange) OnlineNotify(userId string) error {
  45. // return exc.writer.WriteMessages(context.Background(), kafka.Message{
  46. // Key: StackExchangeOnline,
  47. // Value: []byte(userId),
  48. // })
  49. // }
  50. // func (exc *StackExchange) OfflineNotify(userId string) error {
  51. // return exc.writer.WriteMessages(context.Background(), kafka.Message{
  52. // Key: StackExchangeOffline,
  53. // Value: []byte(userId),
  54. // })
  55. // }
  56. // func (exc *StackExchange) SendMessage(ctx context.Context, message kafka.Message) error {
  57. // return exc.writer.WriteMessages(ctx, message)
  58. // }
  59. // func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
  60. // msg, err := exc.reader.ReadMessage(context.Background())
  61. // if err != nil {
  62. // return kafka.Message{}, err
  63. // }
  64. // exc.offset = exc.offset + 1
  65. // return msg, nil
  66. // }
  67. // func (exc *StackExchange) writeMessage(ctx context.Context, msgs ...kafka.Message) error {
  68. // return exc.writer.WriteMessages(ctx, msgs...)
  69. // }
  70. // func (exec *StackExchange) Offset() int64 {
  71. // return exec.reader.Offset()
  72. // }
  73. // func (exec *StackExchange) SetOffsetAt() error {
  74. // ctx, cannel := context.WithTimeout(context.Background(), 3*time.Second)
  75. // defer cannel()
  76. // return exec.reader.SetOffsetAt(ctx, time.Now())
  77. // }