123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package server
- import (
- "encoding/json"
- "time"
- "github.com/gin-gonic/gin"
- "github.com/google/uuid"
- "github.com/gorilla/websocket"
- "sikey.com/websocket/models"
- "sikey.com/websocket/repositories"
- "sikey.com/websocket/utils/zlog"
- )
- type Client struct {
- ctx *gin.Context
- UserId string
- hub *Hub
- UnderlyingConn *websocket.Conn
- isSimpleMsg bool // isSimpleMsg 是否是简单消息
- localization string // localization 国际码
- // Send message channel 发送消息
- // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
- // 则会通过 SendOffline channel 发送离线消息
- Send chan *Message
- SendOffline chan *Message // SendOffline 发送离线消息
- readWait time.Duration // readWait 读超时
- writeWait time.Duration // writeWait 写超时
- pingWait time.Duration // pingWait 心跳超时
- repos *repositories.Repositories
- }
- func (c *Client) reader() {
- defer func() {
- c.hub.Disconnect <- c
- c.UnderlyingConn.Close()
- }()
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
- for {
- _, bytes, err := c.UnderlyingConn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- zlog.Errorf("error: %v", err)
- }
- break
- }
- message := deserializeMessage(bytes)
- switch message.Type {
- case MessageTypeUpChating, MessageTypeDownChating:
- // Chat dialogue messages
- chatingContent := message.Content.(ChatingContent)
- // Save message to database
- payload, _ := json.Marshal(chatingContent.Payload)
- messageId := uuid.NewString()
- err := c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
- ID: messageId,
- SessionId: chatingContent.Receiver,
- Receiver: chatingContent.Receiver,
- Sender: c.UserId,
- Type: uint8(message.Type),
- ContentType: uint8(chatingContent.PayloadType),
- Content: string(payload),
- IsRead: false,
- })
- if err != nil {
- c.writeError(message.RequestId, err)
- continue
- }
- members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
- c.ctx, chatingContent.Receiver, c.UserId)
- if err != nil {
- c.writeError(message.RequestId, err)
- continue
- }
- for _, member := range members {
- var messaging = *message
- messaging.receiver = member.AccountId
- messaging.Content = chatingContent
- c.hub.Message <- &messaging
- }
- // Reply message id
- message.Content = &ContentReply{MessageId: messageId}
- }
- // Reply message
- if message.IsNeedReply() {
- c.Send <- newReplyMessage(message)
- // Reset read dead line, prevent Reader from shutting down
- c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pingWait))
- }
- }
- }
- func (c *Client) writer() {
- ticker := time.NewTicker(c.pingWait)
- defer func() {
- ticker.Stop()
- c.UnderlyingConn.Close()
- }()
- for {
- select {
- case message, ok := <-c.Send:
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if !ok {
- // The hub closed the channel.
- c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
- return
- }
- var err error
- err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
- if err != nil {
- return
- }
- case <-ticker.C:
- // 到时间发送 ping 信号
- c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
- if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
- return
- }
- }
- }
- }
- func (c *Client) writeError(requestId string, err error) {
- c.Send <- &Message{
- RequestId: requestId,
- Type: MessageTypeError,
- Content: ContentError{Err: err.Error()},
- }
- }
|