123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- package biz
- import (
- "context"
- "fmt"
- "github.com/go-kratos/kratos/v2/errors"
- "github.com/gorilla/websocket"
- "io"
- "time"
- v2 "w303a/server/apis/gate/v2"
- "w303a/server/app/gate/internal/conf"
- "w303a/server/pkg/zaplog"
- )
- var (
- // ErrUserOffline 用户不在线
- ErrUserOffline = errors.New(1001, v2.ErrorReason_ERR_USER_OFFLINE.String(), "user offline")
- )
- type Conn struct {
- *websocket.Conn
- c *conf.Server
- log *zaplog.Logger
- ctx context.Context
- msguc *MessageUsecase
- ID string
- }
- type ConnRepo interface {
- // Online 上线
- Online(ctx context.Context, id string, exp time.Duration) error
- // Offline 下线
- Offline(ctx context.Context, id string) error
- // ResetHeartbeat 重置心跳
- ResetHeartbeat(ctx context.Context, c *Conn, exp time.Duration) error
- // GetServerId 获取服务器ID
- GetServerId(ctx context.Context, id string) (string, error)
- }
- type ConnUsecase struct {
- log *zaplog.Logger
- c *conf.Server
- h *Hub
- connRepo ConnRepo
- msguc *MessageUsecase
- }
- func NewConnUsecase(
- log *zaplog.Logger,
- c *conf.Server,
- h *Hub,
- msguc *MessageUsecase,
- connRepo ConnRepo,
- ) *ConnUsecase {
- return &ConnUsecase{
- log: log,
- c: c,
- h: h,
- msguc: msguc,
- connRepo: connRepo,
- }
- }
- func (uc *ConnUsecase) NewConn(ctx context.Context, nsConn *websocket.Conn, id string) (*Conn, error) {
- if id == "" {
- _ = nsConn.Close()
- return nil, errors.New(400, "ID_IS_EMPTY", "id is empty")
- }
- return &Conn{
- ctx: ctx,
- Conn: nsConn,
- ID: id,
- c: uc.c,
- log: uc.log,
- }, nil
- }
- // Reading 读取消息并处理
- func (uc *ConnUsecase) Reading(c *Conn) error {
- // 查询时的消息数据
- uc.msguc.LoadOfflineMessage(c.ctx, c)
- // 持续读取消息
- return uc.heartbeatTimeoutWrapper(c, uc.msguc.HandlerMessage)
- }
- // 读取消息并处理
- func (uc *ConnUsecase) heartbeatTimeoutWrapper(c *Conn, handlerMessage MessageHandler) error {
- ctx := context.WithoutCancel(c.ctx)
- c.SetCloseHandler(func(code int, text string) error {
- uc.h.Unregister(c)
- uc.connRepo.Offline(ctx, c.ID)
- uc.log.Sugar().Infof("conn closed: %d %s", code, text)
- return errors.New(code, "CONN_CLOSED", text)
- })
- for {
- c.resetReadDeadline()
- uc.connRepo.ResetHeartbeat(ctx, c, c.c.Websocket.Keepalive.AsDuration())
- msgBuf, err := c.readMessage()
- if err != nil {
- return err
- }
- // 没消息内容
- if len(msgBuf) == 0 {
- continue
- }
- msg, err := newMessage(msgBuf)
- if err != nil {
- c.log.Sugar().Errorf("new message error: %v", err)
- continue
- }
- // 如果是心跳消息,回复 pong
- if msg.isHeartbeat() {
- c.resetReadDeadline()
- uc.connRepo.ResetHeartbeat(ctx, c, c.c.Websocket.Keepalive.AsDuration())
- c.writeMessage(newReplyHeartbeat(msg))
- continue
- }
- if err = handlerMessage(ctx, c, msg); err != nil {
- c.log.Sugar().Errorf("handler message error: %v", err)
- continue
- }
- }
- }
- func (c *Conn) readMessage() ([]byte, error) {
- var r io.Reader
- mt, r, err := c.NextReader()
- if err != nil {
- return nil, err
- }
- if !c.isMessageTypeSupported(mt) {
- c.log.Sugar().Errorf("unsupported message type: %d", mt)
- return nil, fmt.Errorf("unsupported message type: %d", mt)
- }
- // 处理 ping / pong 帧消息,处理结束后跳过当前消息
- if c.handlePingPongFrame(mt) {
- return make([]byte, 0), nil
- }
- p, err := io.ReadAll(r)
- return p, err
- }
- // 判断消息类型是否支持
- func (c *Conn) isMessageTypeSupported(mt int) bool {
- return mt == websocket.TextMessage || mt == websocket.PingMessage || mt == websocket.PongMessage
- }
- // 处理 ping / pong 帧消息
- func (c *Conn) handlePingPongFrame(mt int) bool {
- if mt == websocket.PingMessage {
- c.WriteMessage(websocket.PongMessage, []byte(""))
- return true
- } else if mt == websocket.PongMessage {
- c.WriteMessage(websocket.PingMessage, []byte(""))
- return true
- }
- return false
- }
- func (c *Conn) resetReadDeadline() {
- c.SetReadDeadline(time.Now().Add(c.c.Websocket.Keepalive.AsDuration()))
- }
- func (c *Conn) resetWriteDeadline() {
- c.SetWriteDeadline(time.Now().Add(c.c.Websocket.WriteTimeout.AsDuration()))
- }
- func (c *Conn) writeMessage(msg *Message) {
- c.resetWriteDeadline()
- if err := c.WriteMessage(websocket.TextMessage, msg.Bytes()); err != nil {
- c.log.Sugar().Errorf("write message error: %v", err)
- }
- }
|