123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package server
- import (
- "context"
- "database/sql"
- "encoding/json"
- "fmt"
- "regexp"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/google/uuid"
- "github.com/gorilla/websocket"
- "github.com/nats-io/nats.go"
- "github.com/rotisserie/eris"
- "go.uber.org/zap"
- "sikey.com/websocket/models"
- "sikey.com/websocket/pkg/gid"
- "sikey.com/websocket/pkg/natx"
- "sikey.com/websocket/pkg/tox"
- "sikey.com/websocket/repositories"
- )
- type Client struct {
- ctx *gin.Context
- UserId string
- srv *Server
- nats *Nats
- UnderlyingConn *websocket.Conn
- online *models.Online
- isSimpleMsg bool // isSimpleMsg 是否是简单消息
- localization string // localization 国际码
- firebaseToken string // firebaseToken FCM 推送的 token
- loginToken string // loginToken 登录 token
- Received chan Message
- // Send message channel 发送消息
- // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
- // Send chan Message
- // send chan *EncodedMessage // 消息通道
- ReplySend chan Message // ReplySend 回复消息专用 Channel
- // firstReadWait 首次消息等待超时时间
- // 当客户端连接创建后会等待一个首次消息,首次消息如果没有在指定时间内发送会主动断开连接
- firstReadWait time.Duration
- // readWait time.Duration // readWait 读超时
- // writeWait 写超时
- // 为了保持服务器稳定, 在往客户端发送消息时设置一个超时时间,
- // 客户端连接不佳时不用花太多的时间在写客户端消息上, 从而保证服务器的协程不堵塞
- writeWait time.Duration
- // heartbeatWait 心跳等待
- // 服务器控制客户端连接的手段, 通过心跳的方式控制服务器保持存活,
- // 一方面是网络协议需要保持活跃, 另一方面是服务器需要踢出长期未活跃的连接
- heartbeatWait time.Duration
- // LastHeartbeatTime 上次心跳时间
- // 记录上次心跳的时间,方便 debug
- LastHeartbeatTime time.Time
- LastReceivedMessageTime time.Time // LastReceivedMessageTime 上次收到消息的时间
- LastReceivedNotifyTime time.Time // LastReceivedNotifyTime 上次收到推送的时间
- repos *repositories.Repositories
- }
- func (c *Client) withRequestIdContext(ctx context.Context, requestId string) context.Context {
- return context.WithValue(ctx, "request_id", requestId)
- }
- // reader 读取到客户端发送的消息, 将消息发送到 nats 里
- func (c *Client) reader() {
- defer func() {
- // c.srv.Disconnect <- c
- // c.nats.Unsubscribe <- &subscriber{client: c}
- // _ = c.UnderlyingConn.Close()
- c.close()
- }()
- // 首次消息超时设置
- //
- // firstReadDeadlineTime := time.Now().Add(c.firstReadWait)
- // _ = c.UnderlyingConn.SetReadDeadline(firstReadDeadlineTime)
- //
- // 客户端断开重新连接后无法保证首次发送消息,所以按照之前的 firstReadWait 无法保证一直连接
- // 这里设置一个写周期,让客户端持续保持,不会因为重连后没有在规定时间内发送心跳而断开
- writeReadDeadlineTime := time.Now().Add(c.writeWait)
- _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
- for {
- // 接收消息
- msgType, bytes, err := c.UnderlyingConn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- zap.L().Error("[conn] normal disconnection",
- zap.String("user_id", c.UserId),
- zap.Error(err))
- }
- // Close connect
- zap.L().Error("[conn] read message error",
- zap.String("user_id", c.UserId),
- zap.Error(err))
- return
- }
- // 收到客户端 websocket 的 ping 消息
- // 收到后默认返回,用途告诉客户端这条连接还是需要保留着
- if msgType == websocket.PingMessage {
- writeReadDeadlineTime := time.Now().Add(c.writeWait)
- c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
- _ = c.UnderlyingConn.WriteMessage(websocket.PongMessage, []byte(""))
- continue
- }
- // 解码消息
- message := deserializeMessage(bytes)
- // 刷新超时时间, 客户端让客户端保持
- heartbeatReadDeadlineTime := time.Now().Add(c.heartbeatWait)
- _ = c.UnderlyingConn.SetReadDeadline(heartbeatReadDeadlineTime)
- switch message.MessageType() {
- case MessageTypePingPong:
- c.LastHeartbeatTime = time.Now()
- zap.L().Info("[reader] 心跳消息", zap.String("user_id", c.UserId))
- // 检查用户是否还在线
- // 如果已经不在线需要关闭连接
- online, _ := c.repos.OnlineRepository.GetOnline(c.ctx, c.UserId)
- if online == nil {
- zap.L().Info("[reader] 心跳时检查 online", zap.Any("data", online), zap.String("user_id", c.UserId))
- return
- }
- // 刷新 Redis 的在线信息
- _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
- // 心跳消息
- c.ReplySend <- newPongMessage(message.RequestId())
- zap.L().Info("[reader] 心跳响应", zap.String("user_id", c.UserId))
- case MessageTypeUpChating:
- c.LastReceivedMessageTime = time.Now()
- // 如果是语聊消息, 将消息落库到 tb_message 表
- if chating, ok := message.(*Chating); ok {
- zap.L().Info("[reader] 收到语聊消息", zap.Any("msg", chating), zap.String("user_id", c.UserId))
- content := chating.Content
- // 字段验证
- if content.SendTime == 0 {
- // 暂时先将发送时间字段不做限制, 并且设置一个默认值
- content.SendTime = time.Now().UTC().UnixMilli()
- // c.ReplySend <- newErrorMessage(message.RequestId(), eris.New("incorrect send time"))
- // continue
- }
- // 查询出来接收消息的人, 这里会过滤掉当前客户端
- isSessionId, receivers := c.regexpReceiveUserIds(content.Receiver)
- if isSessionId {
- content.SessionId = content.Receiver
- }
- zap.L().Info("[reader] 消息接收人", zap.Strings("receivers", receivers))
- // 语聊消息回执, 告诉客户端服务器收到了消息
- // 需要做一条回复消息给到客户端, 让客户端知道消息发送成功了, 然后客户端会站直给用户消息发送状态
- // 这条消息叫消息回执
- c.ReplySend <- newReplyMessage(chating)
- // 将消息写入数据库
- if err := c.repos.Transaction(c.ctx, func(ctx context.Context, repos *repositories.Repositories) error {
- for _, receiver := range receivers {
- mid := gid.GetSnowflakeId()
- err = c.repos.MessageRepository.Create(c.ctx, &models.Message{
- MessageId: mid,
- PayloadType: int(content.PayloadType),
- Payload: serializePayload(content.PayloadType, content.Payload),
- IsRead: -1,
- Receiver: receiver,
- Sender: c.UserId,
- SessionId: sql.NullString{String: content.SessionId, Valid: content.SessionId != ""},
- SendTime: time.UnixMilli(content.SendTime).UTC(),
- })
- if err != nil {
- return err
- }
- // 将消息发送给不同的接收人
- content.MessageId = mid
- content.Receiver = receiver
- // 发送消息到 Nats
- if c.nats.nc.IsClosed() {
- c.nats.nc = natx.Connect()
- }
- resp, err := c.nats.nc.RequestMsg(&nats.Msg{
- Subject: natx.GetSubject(),
- Data: serializeMessage(message),
- }, time.Second*10)
- if err != nil {
- // ERROR
- return err
- }
- var respond RespondStructural
- _ = json.Unmarshal(resp.Data, &respond)
- if !respond.Ok {
- // ERROR
- return err
- }
- }
- return nil
- }); err != nil {
- // ERROR
- c.ReplySend <- newErrorMessage(message.RequestId(), err)
- zap.L().Error("[reader] 存消息出现问题")
- }
- }
- case MessageTypeNotification:
- c.LastReceivedNotifyTime = time.Now()
- // 通知消息
- if notification, ok := message.(*Notification); ok {
- // 将通知消息存起来, 记录下通知消息的
- if err := c.repos.NotifyRepository.Create(c.ctx, &models.Notify{
- NotifyId: notification.Content.ID,
- Sender: notification.Content.Sender,
- Receiver: notification.Content.Receiver,
- IsSent: models.NotifyNotReceived,
- Payload: serializePayload(0, notification.Content.Payload),
- }); err != nil {
- c.ReplySend <- newErrorMessage(message.RequestId(), err)
- continue
- }
- c.Received <- message
- }
- case MessageTypeAck:
- // 收到消息的回执, 处理回执消息
- if notification, ok := message.(*Notification); ok {
- notify, err := c.repos.NotifyRepository.Find(c.ctx, int64(notification.Content.AckId))
- if err != nil || notify == nil {
- c.ReplySend <- newErrorMessage(message.RequestId(), eris.New(fmt.Sprintf("unable to find notify ackId:%d", notification.Content.AckId)))
- continue
- }
- notify.IsSent = models.NotifyReceived
- _ = c.repos.NotifyRepository.Save(c.ctx, notify)
- }
- }
- }
- }
- // recv 客户端收到消息, 收到消息的统一收口
- // 消息来源分为两种:
- // 1. 消息通过 nats 发送过来
- // 2. 消息通过 reader 读取到后被放到了 nats 里然后发送过来
- func (c *Client) recv() {
- defer close(c.Received)
- for {
- select {
- case message, ok := <-c.Received:
- if !ok {
- zap.L().Error("[recv] channel error",
- zap.Error(eris.New("empty is received channel.")),
- zap.String("user_id", c.UserId))
- return
- }
- // 有一些客户端是通过 receiver 来取出 sessionId, 然后去查询消息的, 但这并不是正确的.
- switch message.MessageType() {
- case MessageTypeUpChating:
- if chating, ok := message.(*Chating); ok {
- // 将消息类型改为客户端可以接受到的类型, 客户端下行消息只会收到 message_type: MessageTypeDownChating
- chating.Type = MessageTypeDownChating
- // 这个 message_id 其实用不到, 但为了先兼容App收发消息
- chating.Content.MessageId = "_" + gid.GetSnowflakeId()
- message = chating
- }
- }
- zap.L().Info("[recv] 发送消息", zap.Any("message", message), zap.String("user_id", c.UserId))
- writeReadDeadlineTime := time.Now().Add(c.writeWait)
- _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
- if err := c.UnderlyingConn.WriteJSON(message); err != nil {
- zap.L().Error("[recv] write error", zap.Error(err), zap.String("user_id", c.UserId))
- }
- }
- }
- }
- // reply 回复消息专用
- func (c *Client) reply() {
- defer close(c.ReplySend)
- for {
- select {
- case message, ok := <-c.ReplySend:
- if !ok {
- zap.L().Error("[reply] channel error",
- zap.Error(eris.New("empty is replySend channel.")),
- zap.String("user_id", c.UserId))
- return
- }
- // zap.L().Info("[conn] 回复消息到客户端", zap.Any("message", message), zap.String("user_id", c.UserId))
- writeReadDeadlineTime := time.Now().Add(c.writeWait)
- _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
- if err := c.UnderlyingConn.WriteJSON(message); err != nil {
- zap.L().Error("[reply] write error", zap.Error(err), zap.String("user_id", c.UserId))
- }
- }
- }
- }
- // regexpReceiveUserIds 通过 receiver 获取接收者的用户ID
- // 使用正则表达式验证ID 是否是 account_id 或 session_id
- // session_id 的话需要查询 session_member 表获取 session 的成员
- func (c *Client) regexpReceiveUserIds(receiver string) (bool, []string) {
- if receiver == "" {
- return false, []string{}
- }
- reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
- if reg.Match([]byte(receiver)) {
- return false, []string{receiver}
- }
- var receivers = make([]string, 0)
- if models.IsSessionSingle(receiver) {
- single, err := c.repos.SessionSingleRepository.Get(c.ctx, receiver)
- if err != nil {
- zap.L().Error("unable to get single session", zap.Error(err), zap.String("user_id", c.UserId))
- return false, []string{}
- }
- if single == nil {
- // 会话已经被删除
- zap.L().Error("[reader] 检查接收人 ID 类型时出错", zap.Error(err), zap.String("user_id", c.UserId))
- return false, []string{}
- }
- toUser := tox.TernaryOperation(
- single.TargetUserID == c.UserId,
- single.ToUserID,
- single.TargetUserID,
- ).(string)
- receivers = append(receivers, toUser)
- } else if models.IsSessionGroup(receiver) {
- group, err := c.repos.SessionGroupRepository.Get(c.ctx, receiver)
- if err != nil {
- zap.L().Error("unable to get session group", zap.Error(err), zap.String("user_id", c.UserId))
- return false, []string{}
- }
- members := group.Members.ToSlice()
- for _, member := range members {
- if member.UserId != c.UserId {
- receivers = append(receivers, member.UserId)
- }
- }
- } else {
- return false, []string{}
- }
- return true, receivers
- }
- // loadOfflineMessage 查询离线时未接收的消息,并且推送给客户端
- func (c *Client) loadOfflineMessage() {
- defer func() {
- if err := recover(); err != nil {
- zap.L().Error("[conn] 加载离线消息出现问题", zap.Any("err", err))
- }
- return
- }()
- rid := uuid.NewString()
- unreadMsg, err := c.repos.MessageRepository.FindUnread(c.ctx, c.UserId)
- if err != nil {
- // 查询未读消息出现错误, 给登录的用户发送一个错误信息
- c.ReplySend <- newErrorMessage(rid, err)
- return
- }
- for _, msg := range unreadMsg {
- nc := c.nats.nc
- if nc.IsClosed() {
- nc = natx.Connect()
- }
- chating := Chating{
- MessageImpl: MessageImpl{
- Type: MessageTypeUpChating,
- RId: rid,
- },
- Content: &ChatingContent{
- MessageId: msg.MessageId,
- Receiver: msg.Receiver,
- SessionId: msg.SessionId.String,
- PayloadType: uint8(msg.PayloadType),
- Payload: deserializePayload(msg.Payload),
- SendTime: msg.SendTime.UTC().UnixMilli(),
- },
- }
- zap.L().Info("[conn] 客户端离线时的离线消息", zap.Any("msg", chating), zap.String("user_id", c.UserId))
- if _, err := nc.RequestMsg(&nats.Msg{
- Subject: natx.GetSubject(),
- Data: chating.Data(),
- }, time.Second*5); err != nil {
- c.ReplySend <- newErrorMessage(rid, err)
- }
- }
- }
- // Online 客户端上线, 将用户数据加入到 Redis
- func (c *Client) Online() error {
- err := c.repos.OnlineRepository.SetOnline(c.ctx, c.online)
- if err != nil {
- return eris.Wrapf(err, "unable to set online status for user: %s", c.UserId)
- }
- return nil
- }
- // Offline 客户端下线, 将用户信息从 Redis 移除
- func (c *Client) Offline() {
- _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
- }
- // Close websocket connection
- func (c *Client) close() {
- c.srv.Disconnect <- c
- c.nats.Unsubscribe <- &subscriber{client: c}
- c.UnderlyingConn.Close()
- c.Offline()
- }
|