client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package server
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "regexp"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/google/uuid"
  11. "github.com/gorilla/websocket"
  12. "github.com/nats-io/nats.go"
  13. "github.com/nicksnyder/go-i18n/v2/i18n"
  14. "github.com/rotisserie/eris"
  15. "go.uber.org/zap"
  16. "sikey.com/websocket/models"
  17. "sikey.com/websocket/pkg/gid"
  18. "sikey.com/websocket/pkg/natx"
  19. "sikey.com/websocket/pkg/tox"
  20. "sikey.com/websocket/repositories"
  21. )
  22. type Client struct {
  23. ctx *gin.Context
  24. UserId string
  25. srv *Server
  26. nats *Nats
  27. UnderlyingConn *websocket.Conn
  28. online *models.Online
  29. isWatch bool // isWatch 是否是手表
  30. isSimpleMsg bool // isSimpleMsg 是否是简单消息
  31. localization string // localization 国际码
  32. localizer *i18n.Localizer
  33. firebaseToken string // firebaseToken FCM 推送的 token
  34. loginToken string // loginToken 登录 token
  35. Received chan Message
  36. // Send message channel 发送消息
  37. // 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
  38. // Send chan Message
  39. // send chan *EncodedMessage // 消息通道
  40. ReplySend chan Message // ReplySend 回复消息专用 Channel
  41. // firstReadWait 首次消息等待超时时间
  42. // 当客户端连接创建后会等待一个首次消息,首次消息如果没有在指定时间内发送会主动断开连接
  43. firstReadWait time.Duration
  44. // readWait time.Duration // readWait 读超时
  45. // writeWait 写超时
  46. // 为了保持服务器稳定, 在往客户端发送消息时设置一个超时时间,
  47. // 客户端连接不佳时不用花太多的时间在写客户端消息上, 从而保证服务器的协程不堵塞
  48. writeWait time.Duration
  49. // heartbeatWait 心跳等待
  50. // 服务器控制客户端连接的手段, 通过心跳的方式控制服务器保持存活,
  51. // 一方面是网络协议需要保持活跃, 另一方面是服务器需要踢出长期未活跃的连接
  52. heartbeatWait time.Duration
  53. // LastHeartbeatTime 上次心跳时间
  54. // 记录上次心跳的时间,方便 debug
  55. LastHeartbeatTime time.Time
  56. LastReceivedMessageTime time.Time // LastReceivedMessageTime 上次收到消息的时间
  57. LastReceivedNotifyTime time.Time // LastReceivedNotifyTime 上次收到推送的时间
  58. repos *repositories.Repositories
  59. }
  60. func (c *Client) withRequestIdContext(ctx context.Context, requestId string) context.Context {
  61. return context.WithValue(ctx, "request_id", requestId)
  62. }
  63. // reader 读取到客户端发送的消息, 将消息发送到 nats 里
  64. func (c *Client) reader() {
  65. defer c.close()
  66. // 首次消息超时设置
  67. //
  68. // firstReadDeadlineTime := time.Now().Add(c.firstReadWait)
  69. // _ = c.UnderlyingConn.SetReadDeadline(firstReadDeadlineTime)
  70. //
  71. // 客户端断开重新连接后无法保证首次发送消息,所以按照之前的 firstReadWait 无法保证一直连接
  72. // 这里设置一个写周期,让客户端持续保持,不会因为重连后没有在规定时间内发送心跳而断开
  73. writeReadDeadlineTime := time.Now().Add(c.writeWait)
  74. _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
  75. for {
  76. // 接收消息
  77. msgType, bytes, err := c.UnderlyingConn.ReadMessage()
  78. if err != nil {
  79. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  80. zap.L().Error("[conn] normal disconnection",
  81. zap.String("user_id", c.UserId),
  82. zap.Error(err))
  83. }
  84. // Close connect
  85. zap.L().Error("[conn] read message error",
  86. zap.String("user_id", c.UserId),
  87. zap.Error(err))
  88. return
  89. }
  90. // 收到客户端 websocket 的 ping 消息
  91. // 收到后默认返回,用途告诉客户端这条连接还是需要保留着
  92. if msgType == websocket.PingMessage {
  93. writeReadDeadlineTime := time.Now().Add(c.writeWait)
  94. c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
  95. _ = c.UnderlyingConn.WriteMessage(websocket.PongMessage, []byte(""))
  96. continue
  97. }
  98. // 解码消息
  99. message := deserializeMessage(bytes)
  100. zap.L().Info("[reader] 收到消息", zap.String("user_id", c.UserId), zap.Any("message", message))
  101. // 刷新超时时间, 客户端让客户端保持
  102. heartbeatReadDeadlineTime := time.Now().Add(c.heartbeatWait)
  103. _ = c.UnderlyingConn.SetReadDeadline(heartbeatReadDeadlineTime)
  104. switch message.MessageType() {
  105. case MessageTypePingPong:
  106. c.LastHeartbeatTime = time.Now()
  107. zap.L().Info("[reader] 心跳消息", zap.String("user_id", c.UserId))
  108. // 检查用户是否还在线
  109. // 如果已经不在线需要关闭连接
  110. online, _ := c.repos.OnlineRepository.GetOnline(c.ctx, c.UserId)
  111. if online == nil {
  112. zap.L().Info("[reader] 心跳时检查 online", zap.Any("data", online), zap.String("user_id", c.UserId))
  113. return
  114. }
  115. // 刷新 Redis 的在线信息
  116. _ = c.repos.OnlineRepository.SetOnline(c.ctx, c.online)
  117. // _ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
  118. // 心跳消息
  119. c.ReplySend <- newPongMessage(message.RequestId())
  120. zap.L().Info("[reader] 心跳响应", zap.String("user_id", c.UserId))
  121. case MessageTypeUpChating:
  122. c.LastReceivedMessageTime = time.Now()
  123. // 如果是语聊消息, 将消息落库到 tb_message 表
  124. if chating, ok := message.(*Chating); ok {
  125. zap.L().Info("[reader] 收到语聊消息", zap.Any("msg", chating), zap.String("user_id", c.UserId))
  126. content := chating.Content
  127. // 字段验证
  128. if content.SendTime == 0 {
  129. // 暂时先将发送时间字段不做限制, 并且设置一个默认值
  130. content.SendTime = time.Now().UTC().UnixMilli()
  131. // c.ReplySend <- newErrorMessage(message.RequestId(), eris.New("incorrect send time"))
  132. // continue
  133. }
  134. // 查询出来接收消息的人, 这里会过滤掉当前客户端
  135. isSessionId, receivers := c.regexpReceiveUserIds(content.Receiver)
  136. if isSessionId {
  137. content.SessionId = content.Receiver
  138. }
  139. zap.L().Info("[reader] 消息接收人", zap.Strings("receivers", receivers))
  140. // 语聊消息回执, 告诉客户端服务器收到了消息
  141. // 需要做一条回复消息给到客户端, 让客户端知道消息发送成功了, 然后客户端会站直给用户消息发送状态
  142. // 这条消息叫消息回执
  143. c.ReplySend <- newReplyMessage(chating)
  144. // 将消息写入数据库
  145. if err := c.repos.Transaction(c.ctx, func(ctx context.Context, repos *repositories.Repositories) error {
  146. for _, receiver := range receivers {
  147. mid := gid.GetSnowflakeId()
  148. err = c.repos.MessageRepository.Create(c.ctx, &models.Message{
  149. MessageId: mid,
  150. PayloadType: int(content.PayloadType),
  151. Payload: serializePayload(content.PayloadType, content.Payload),
  152. IsRead: -1,
  153. Receiver: receiver,
  154. Sender: c.UserId,
  155. SessionId: sql.NullString{String: content.SessionId, Valid: content.SessionId != ""},
  156. SendTime: time.UnixMilli(content.SendTime).UTC(),
  157. })
  158. if err != nil {
  159. return err
  160. }
  161. // 将消息发送给不同的接收人
  162. content.MessageId = mid
  163. content.Receiver = receiver
  164. // 发送消息到 Nats
  165. if c.nats.nc.IsClosed() {
  166. c.nats.nc = natx.Connect()
  167. }
  168. resp, err := c.nats.nc.RequestMsg(&nats.Msg{
  169. Subject: natx.GetSubject(),
  170. Data: serializeMessage(message),
  171. }, time.Second*10)
  172. if err != nil {
  173. // ERROR
  174. return err
  175. }
  176. var respond RespondStructural
  177. _ = json.Unmarshal(resp.Data, &respond)
  178. if !respond.Ok {
  179. // ERROR
  180. return err
  181. }
  182. }
  183. return nil
  184. }); err != nil {
  185. // ERROR
  186. c.ReplySend <- newErrorMessage(message.RequestId(), err)
  187. zap.L().Error("[reader] 存消息出现问题")
  188. }
  189. }
  190. case MessageTypeNotification:
  191. c.LastReceivedNotifyTime = time.Now()
  192. // 通知消息
  193. if notification, ok := message.(*Notification); ok {
  194. if notification.Content.Receiver == "" {
  195. c.ReplySend <- newErrorMessage(message.RequestId(), eris.New("No receiver"))
  196. continue
  197. }
  198. // 将通知消息存起来, 记录下通知消息的
  199. if err := c.repos.NotifyRepository.Create(c.ctx, &models.Notify{
  200. NotifyId: notification.Content.ID,
  201. Sender: notification.Content.Sender,
  202. Receiver: notification.Content.Receiver,
  203. Payload: serializePayload(0, notification.Content.Payload),
  204. }); err != nil {
  205. c.ReplySend <- newErrorMessage(message.RequestId(), err)
  206. continue
  207. }
  208. c.Received <- message
  209. }
  210. case MessageTypeAck:
  211. // 收到消息的回执, 处理回执消息
  212. if ack, ok := message.(*Ack); ok {
  213. if err := c.repos.OnlineRepository.DelOfflineNotification(c.ctx, int64(ack.Content), c.UserId); err != nil {
  214. c.ReplySend <- newErrorMessage(message.RequestId(),
  215. eris.New(fmt.Sprintf("unable to find notify ackId: %d", ack.Content)))
  216. continue
  217. }
  218. }
  219. case MessageTypeError:
  220. // 错误消息
  221. c.ReplySend <- message
  222. }
  223. }
  224. }
  225. // recv 客户端收到消息, 收到消息的统一收口
  226. // 消息来源分为两种:
  227. // 1. 消息通过 nats 发送过来
  228. // 2. 消息通过 reader 读取到后被放到了 nats 里然后发送过来
  229. func (c *Client) recv() {
  230. defer close(c.Received)
  231. for {
  232. select {
  233. case message, ok := <-c.Received:
  234. if !ok {
  235. zap.L().Error("[recv] channel error",
  236. zap.Error(eris.New("empty is received channel.")),
  237. zap.String("user_id", c.UserId))
  238. return
  239. }
  240. // 有一些客户端是通过 receiver 来取出 sessionId, 然后去查询消息的, 但这并不是正确的.
  241. switch message.MessageType() {
  242. case MessageTypeUpChating:
  243. if chating, ok := message.(*Chating); ok {
  244. // 将消息类型改为客户端可以接受到的类型, 客户端下行消息只会收到 message_type: MessageTypeDownChating
  245. chating.Type = MessageTypeDownChating
  246. // 这个 message_id 其实用不到, 但为了先兼容App收发消息
  247. chating.Content.MessageId = "_" + gid.GetSnowflakeId()
  248. message = chating
  249. }
  250. }
  251. zap.L().Info("[recv] 发送消息", zap.Any("message", message), zap.String("user_id", c.UserId))
  252. writeReadDeadlineTime := time.Now().Add(c.writeWait)
  253. _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
  254. if err := c.UnderlyingConn.WriteJSON(message); err != nil {
  255. zap.L().Error("[recv] write error", zap.Error(err), zap.String("user_id", c.UserId))
  256. }
  257. }
  258. }
  259. }
  260. // reply 回复消息专用
  261. func (c *Client) reply() {
  262. defer close(c.ReplySend)
  263. for {
  264. select {
  265. case message, ok := <-c.ReplySend:
  266. if !ok {
  267. zap.L().Error("[reply] channel error",
  268. zap.Error(eris.New("empty is replySend channel.")),
  269. zap.String("user_id", c.UserId))
  270. return
  271. }
  272. // zap.L().Info("[conn] 回复消息到客户端", zap.Any("message", message), zap.String("user_id", c.UserId))
  273. writeReadDeadlineTime := time.Now().Add(c.writeWait)
  274. _ = c.UnderlyingConn.SetWriteDeadline(writeReadDeadlineTime)
  275. if err := c.UnderlyingConn.WriteJSON(message); err != nil {
  276. zap.L().Error("[reply] write error", zap.Error(err), zap.String("user_id", c.UserId))
  277. }
  278. }
  279. }
  280. }
  281. // regexpReceiveUserIds 通过 receiver 获取接收者的用户ID
  282. // 使用正则表达式验证ID 是否是 account_id 或 session_id
  283. // session_id 的话需要查询 session_member 表获取 session 的成员
  284. func (c *Client) regexpReceiveUserIds(receiver string) (bool, []string) {
  285. if receiver == "" {
  286. return false, []string{}
  287. }
  288. reg, _ := regexp.Compile(`[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}`)
  289. if reg.Match([]byte(receiver)) {
  290. return false, []string{receiver}
  291. }
  292. var receivers = make([]string, 0)
  293. if models.IsSessionSingle(receiver) {
  294. single, err := c.repos.SessionSingleRepository.Get(c.ctx, receiver)
  295. if err != nil {
  296. zap.L().Error("unable to get single session", zap.Error(err), zap.String("user_id", c.UserId))
  297. return false, []string{}
  298. }
  299. if single == nil {
  300. // 会话已经被删除
  301. zap.L().Error("[reader] 检查接收人 ID 类型时出错", zap.Error(err), zap.String("user_id", c.UserId))
  302. return false, []string{}
  303. }
  304. toUser := tox.TernaryOperation(
  305. single.TargetUserID == c.UserId,
  306. single.ToUserID,
  307. single.TargetUserID,
  308. ).(string)
  309. receivers = append(receivers, toUser)
  310. } else if models.IsSessionGroup(receiver) {
  311. group, err := c.repos.SessionGroupRepository.Get(c.ctx, receiver)
  312. if err != nil {
  313. zap.L().Error("unable to get session group", zap.Error(err), zap.String("user_id", c.UserId))
  314. return false, []string{}
  315. }
  316. members := group.Members.ToSlice()
  317. for _, member := range members {
  318. if member.UserId != c.UserId {
  319. receivers = append(receivers, member.UserId)
  320. }
  321. }
  322. } else {
  323. return false, []string{}
  324. }
  325. return true, receivers
  326. }
  327. // loadOfflineMessage 查询离线时未接收的消息,并且推送给客户端
  328. func (c *Client) loadOfflineMessage() {
  329. defer func() {
  330. if err := recover(); err != nil {
  331. zap.L().Error("[conn] 加载离线消息出现问题", zap.Any("err", err))
  332. }
  333. return
  334. }()
  335. rid := uuid.NewString()
  336. unreadMsg, err := c.repos.MessageRepository.FindUnread(c.ctx, c.UserId)
  337. if err != nil {
  338. // 查询未读消息出现错误, 给登录的用户发送一个错误信息
  339. c.ReplySend <- newErrorMessage(rid, err)
  340. return
  341. }
  342. for _, msg := range unreadMsg {
  343. nc := c.nats.nc
  344. if nc.IsClosed() {
  345. nc = natx.Connect()
  346. }
  347. chating := Chating{
  348. MessageImpl: MessageImpl{
  349. Type: MessageTypeUpChating,
  350. RId: rid,
  351. },
  352. Content: &ChatingContent{
  353. MessageId: msg.MessageId,
  354. Receiver: msg.Receiver,
  355. SessionId: msg.SessionId.String,
  356. PayloadType: uint8(msg.PayloadType),
  357. Payload: deserializePayload(msg.Payload),
  358. SendTime: msg.SendTime.UTC().UnixMilli(),
  359. },
  360. }
  361. zap.L().Info("[conn] 客户端离线时的离线消息", zap.Any("msg", chating), zap.String("user_id", c.UserId))
  362. if _, err := nc.RequestMsg(&nats.Msg{
  363. Subject: natx.GetSubject(),
  364. Data: chating.Data(),
  365. }, time.Second*5); err != nil {
  366. c.ReplySend <- newErrorMessage(rid, err)
  367. }
  368. }
  369. // 未收到的通知消息
  370. notifications, err := c.repos.OnlineRepository.GetOfflineNotification(c.ctx, c.UserId)
  371. if err != nil {
  372. zap.L().Error("[conn] 无法获取离线通知消息", zap.String("user_id", c.UserId), zap.Error(err))
  373. return
  374. }
  375. zap.L().Info("[conn] 消息通知消息", zap.Any("notifications", notifications), zap.String("user_id", c.UserId))
  376. for seq, notification := range notifications {
  377. nc := c.nats.nc
  378. if nc.IsClosed() {
  379. nc = natx.Connect()
  380. }
  381. notify := Notification{
  382. MessageImpl: MessageImpl{
  383. Type: MessageTypeNotification,
  384. RId: rid,
  385. },
  386. Content: &NotificationContent{
  387. ID: notification.NotifyId,
  388. AckId: seq,
  389. Receiver: notification.Receiver,
  390. Sender: notification.Sender,
  391. Payload: deserializePayload(notification.Payload),
  392. },
  393. }
  394. zap.L().Info("[conn] 离线时收到的通知消息", zap.Any("notification", notify), zap.String("user_id", c.UserId))
  395. if _, err := nc.RequestMsg(&nats.Msg{
  396. Subject: natx.GetSubject(),
  397. Data: notify.Data(),
  398. }, time.Second*5); err != nil {
  399. c.ReplySend <- newErrorMessage(rid, err)
  400. }
  401. }
  402. }
  403. // Online 客户端上线, 将用户数据加入到 Redis
  404. func (c *Client) Online() error {
  405. err := c.repos.OnlineRepository.SetOnline(c.ctx, c.online)
  406. if err != nil {
  407. return eris.Wrapf(err, "unable to set online status for user: %s", c.UserId)
  408. }
  409. return nil
  410. }
  411. // Offline 客户端下线, 将用户信息从 Redis 移除
  412. func (c *Client) Offline() {
  413. _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
  414. }
  415. // Close websocket connection
  416. func (c *Client) close() {
  417. c.srv.Disconnect <- c
  418. c.nats.Unsubscribe <- &subscriber{client: c}
  419. c.UnderlyingConn.Close()
  420. c.Offline()
  421. }