123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package server
- import (
- "encoding/json"
- "regexp"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/google/martian/v3/log"
- "github.com/gorilla/websocket"
- "github.com/mitchellh/mapstructure"
- "github.com/rotisserie/eris"
- "sikey.com/websocket/models"
- "sikey.com/websocket/repositories"
- "sikey.com/websocket/utils/gid"
- "x.sikey.com.cn/serverx/zlog"
- )
- type Client struct {
- ctx *gin.Context
- UserId string
- hub *Hub
- UnderlyingConn *websocket.Conn
- online *models.Online
- // logger *zlog.Logger
- isSimpleMsg bool // isSimpleMsg 是否是简单消息
- localization string // localization 国际码
- firebaseToken string // firebaseToken FCM 推送的 token
- // Send message channel 发送消息
- // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
- Send chan *Message
- readWait time.Duration // readWait 读超时
- writeWait time.Duration // writeWait 写超时
- pingWait time.Duration // pingWait 心跳超时
- repos *repositories.Repositories
- }
- func (c *Client) reader() {
- defer func() {
- c.hub.Disconnect <- c
- c.Close()
- // c.logger.Info("client Offline")
- }()
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
- for {
- _, bytes, err := c.UnderlyingConn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- // c.logger.Errorf("error: %v", eris.Wrap(err, c.UserId))
- } else {
- // c.logger.Errorf("error: %v", eris.Wrap(err, c.UserId))
- }
- // Close connect
- // _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
- return
- }
- var message = deserializeMessage(bytes)
- // var log = c.logger.WithField(keys.HeaderRequestId, message.RequestId)
- switch message.Type {
- case MessageTypePingPong:
- // zlog.Debugf("receive ping message from %s", c.UserId)
- _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
- case MessageTypeUpChating, MessageTypeDownChating:
- // Chat dialogue messages
- chatingContent := message.Content.(ChatingContent)
- // Save message to database
- messageId := gid.GetMessageId()
- chatingContent.MessageId = messageId
- chatingContent.SessionId = chatingContent.Receiver
- err = c.saveMessage(messageId, message.Type, &chatingContent)
- if err != nil {
- c.writeError(message.RequestId, err)
- continue
- }
- // Receiver ID format determines whether the receiver is an account or a session
- // log.Infof("message receiver: %s", chatingContent.Receiver)
- users := c.getReceiverUserIds(chatingContent.Receiver)
- // log.Infof("message to users: %v", users)
- for _, id := range users {
- var messaging = *message
- messaging.Receiver = id
- messaging.Content = chatingContent
- // zlog.Infof("Send message %s to %s", c.UserId, id)
- // Check if the user is online
- if c.firebaseToken != "" {
- online, err := c.repos.OnlineRepository.Is(c.ctx, id)
- if err != nil {
- // log.Error(eris.Wrap(err, "unable to find online user"))
- continue
- }
- if !online {
- // Send FCM message
- // token, err := c.repos.FirebaseMessageRepository.GetFirebaseToken(c.ctx, id)
- // if err != nil {
- // if eris.Is(err, models.ErrRecordNotFound) {
- // // log.Debugf("user %s not found firebase token", id)
- // }
- // } else {
- // c.hub.FirebaseMessage <- &FirebaseMessage{token: token.Token, message: &messaging, clt: c}
- // }
- }
- }
- c.hub.Message <- &messaging
- }
- // Reply message id
- message.Content = &ContentReply{MessageId: messageId}
- }
- // Reply message
- // zlog.Debugf("reply message %s to %s", message.RequestId, c.UserId)
- if message.IsNeedReply() {
- c.Send <- newReplyMessage(message)
- // Reset read deadline, prevent Reader from shutting down
- // zlog.Debugf("reset read deadline for %s", c.UserId)
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
- }
- }
- }
- func (c *Client) writer() {
- ticker := time.NewTicker(c.pingWait)
- defer func() {
- ticker.Stop()
- c.Close()
- }()
- for {
- select {
- case message, ok := <-c.Send:
- // var log = c.logger
- if message != nil {
- // log = log.WithField(keys.HeaderRequestId, message.RequestId)
- }
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if !ok {
- // The hub closed the channel.
- c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
- return
- }
- var err error
- err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
- if err != nil {
- return
- }
- // Received modification message status
- switch message.Type {
- case MessageTypeUpChating, MessageTypeDownChating:
- // Chat dialogue messages
- if chatingContent, ok := message.Content.(ChatingContent); ok {
- if msg, err := c.repos.SessionRepository.FindMessageById(c.ctx, chatingContent.MessageId); err == nil {
- msg.Received = true
- c.repos.SessionRepository.UpdateMessage(c.ctx, msg)
- } else {
- if eris.Is(err, models.ErrRecordNotFound) {
- break
- }
- // log.Error(err)
- }
- }
- }
- log.Infof("write message %s", message)
- case <-ticker.C:
- // 到时间发送 ping 信号
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
- return
- }
- }
- }
- }
- func (c *Client) writeError(requestId string, err error) {
- c.Send <- &Message{
- RequestId: requestId,
- Type: MessageTypeError,
- Content: ContentError{Err: err.Error()},
- }
- }
- func (c *Client) saveMessage(messageId string, messageType int8, content *ChatingContent) error {
- // Standardized structure, This is not an unnecessary step!!!
- // Filter out excess fields.
- var err error
- switch content.PayloadType {
- case ChatingContentTypeText:
- var textContent ContentText
- err = mapstructure.Decode(content.Payload, &textContent)
- content.Payload = textContent
- case ChatingContentTypeMetadata:
- var contentMetadata ContentMetadata
- err = mapstructure.Decode(content.Payload, &contentMetadata)
- content.Payload = contentMetadata
- }
- if err != nil {
- return eris.Wrap(err, "unable to decode message content")
- }
- payload, _ := json.Marshal(content.Payload)
- return c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
- ID: messageId,
- SessionId: content.Receiver,
- Receiver: content.Receiver,
- Sender: c.UserId,
- Type: messageType,
- ContentType: content.PayloadType,
- Content: payload,
- IsRead: false,
- })
- }
- // getReceiverUserIds 通过 receiver 获取接收者的用户ID
- // 使用正则表达式验证ID 是否是 account_id 或 session_id
- // session_id 的话需要查询 session_member 表获取 session 的成员
- func (c *Client) getReceiverUserIds(receiver string) []string {
- reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
- if reg.Match([]byte(receiver)) {
- return []string{receiver}
- }
- members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
- c.ctx, receiver, c.UserId)
- if err != nil {
- zlog.Error(eris.Wrap(err, "unable to get session members"))
- return []string{}
- }
- var ms = make([]string, len(members))
- for i, memb := range members {
- ms[i] = memb.RefId
- }
- return ms
- }
- // Close websocket connection
- func (c *Client) Close() {
- c.UnderlyingConn.Close()
- online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
- c.repos.OnlineRepository.Offline(c.ctx, online)
- }
|