1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package stackexchange
- // var (
- // StackExchangeOnline = []byte("exchange_online") // StackExchangeOnline 上线
- // StackExchangeOffline = []byte("exchange_offline") // StackExchangeOffline 下线
- // StackExchangeMessaging = []byte("exchange_messaging") // StackExchangeMessaging 消息
- // )
- // type StackExchange struct {
- // Brokers []string
- // Topic string
- // Partition int
- // MaxBytes int
- // offset int64
- // reader *kafka.Reader
- // writer *kafka.Writer
- // }
- // func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
- // // dialer
- // dialer := &kafka.Dialer{
- // Timeout: config.Kafka.Timeout * time.Second,
- // DualStack: true,
- // }
- // reader := kafka.NewReader(kafka.ReaderConfig{
- // Brokers: brokers,
- // Topic: topic,
- // StartOffset: kafka.FirstOffset,
- // Dialer: dialer,
- // })
- // // exchange
- // reader.SetOffsetAt(context.TODO(), time.Now())
- // return &StackExchange{
- // Brokers: brokers,
- // Topic: topic,
- // Partition: partition,
- // MaxBytes: maxBytes,
- // offset: 0,
- // reader: reader,
- // writer: &kafka.Writer{
- // Addr: kafka.TCP(brokers...),
- // Topic: topic,
- // AllowAutoTopicCreation: true,
- // },
- // }
- // }
- // func (exc *StackExchange) OnlineNotify(userId string) error {
- // return exc.writer.WriteMessages(context.Background(), kafka.Message{
- // Key: StackExchangeOnline,
- // Value: []byte(userId),
- // })
- // }
- // func (exc *StackExchange) OfflineNotify(userId string) error {
- // return exc.writer.WriteMessages(context.Background(), kafka.Message{
- // Key: StackExchangeOffline,
- // Value: []byte(userId),
- // })
- // }
- // func (exc *StackExchange) SendMessage(ctx context.Context, message kafka.Message) error {
- // return exc.writer.WriteMessages(ctx, message)
- // }
- // func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
- // msg, err := exc.reader.ReadMessage(context.Background())
- // if err != nil {
- // return kafka.Message{}, err
- // }
- // exc.offset = exc.offset + 1
- // return msg, nil
- // }
- // func (exc *StackExchange) writeMessage(ctx context.Context, msgs ...kafka.Message) error {
- // return exc.writer.WriteMessages(ctx, msgs...)
- // }
- // func (exec *StackExchange) Offset() int64 {
- // return exec.reader.Offset()
- // }
- // func (exec *StackExchange) SetOffsetAt() error {
- // ctx, cannel := context.WithTimeout(context.Background(), 3*time.Second)
- // defer cannel()
- // return exec.reader.SetOffsetAt(ctx, time.Now())
- // }
|