online_repostiroy.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package repositories
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "github.com/redis/go-redis/v9"
  9. "github.com/rotisserie/eris"
  10. "sikey.com/websocket/config"
  11. "sikey.com/websocket/models"
  12. )
  13. var _ OnlineRepository = (*onlineRepository)(nil)
  14. type OnlineRepository interface {
  15. SetOnline(ctx context.Context, o *models.Online) error
  16. GetOnline(ctx context.Context, uid string) (*models.Online, error)
  17. Offline(ctx context.Context, o *models.Online) error
  18. Heartbeat(ctx context.Context, o *models.Online) error
  19. GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error)
  20. DelOfflineNotification(ctx context.Context, seq int64, uid string) error
  21. // Is 是否在线
  22. Is(ctx context.Context, userId string) (bool, error)
  23. // GetServerClients 获取指定服务在线的用户
  24. GetServerClients(ctx context.Context, serverId string) ([]models.Online, error)
  25. }
  26. type onlineRepository struct {
  27. rdb *redis.Client
  28. HeartbeatWait time.Duration
  29. }
  30. // DelOfflineNotification implements OnlineRepository.
  31. func (repo *onlineRepository) DelOfflineNotification(ctx context.Context, seq int64, uid string) error {
  32. var messageKey string = fmt.Sprintf("clients.offline_notification.%s", uid)
  33. return repo.rdb.HDel(ctx, messageKey, fmt.Sprintf("%d", seq)).Err()
  34. }
  35. // GetOfflineNotification implements OnlineRepository.
  36. func (repo *onlineRepository) GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error) {
  37. var messageKey string = fmt.Sprintf("clients.offline_notification.%s", uid)
  38. var err error
  39. var notification = make(map[int]models.Notify)
  40. // err = repo.sou
  41. result, err := repo.rdb.HGetAll(ctx, messageKey).Result()
  42. if err != nil {
  43. if eris.Is(err, redis.Nil) {
  44. return make(map[int]models.Notify), nil
  45. }
  46. }
  47. for seqStr, notificationStr := range result {
  48. seq, _ := strconv.ParseInt(seqStr, 10, 64)
  49. var notify models.Notify
  50. _ = json.Unmarshal([]byte(notificationStr), &notify)
  51. notification[int(seq)] = notify
  52. }
  53. return notification, nil
  54. }
  55. // GetOnline implements OnlineRepository.
  56. func (repo *onlineRepository) GetOnline(ctx context.Context, uid string) (*models.Online, error) {
  57. keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", uid)).Result()
  58. if err != nil {
  59. if eris.Is(err, redis.Nil) {
  60. return nil, nil
  61. }
  62. return nil, eris.Wrap(err, "unable to get keys")
  63. }
  64. // 空的集合
  65. if len(keys) == 0 {
  66. return nil, nil
  67. }
  68. var o models.Online
  69. if err := repo.rdb.Get(ctx, keys[0]).Scan(&o); err != nil {
  70. return nil, err
  71. }
  72. return &o, nil
  73. }
  74. // GetServerClients implements OnlineRepository.
  75. func (repo *onlineRepository) GetServerClients(ctx context.Context, serverId string) ([]models.Online, error) {
  76. keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.%s.*", serverId)).Result()
  77. if err != nil {
  78. if eris.Is(err, redis.Nil) {
  79. return make([]models.Online, 0), nil
  80. }
  81. return nil, eris.Wrap(err, "unable to get keys")
  82. }
  83. var lines = make([]models.Online, len(keys))
  84. for i, key := range keys {
  85. var o models.Online
  86. repo.rdb.Get(ctx, key).Scan(&o)
  87. lines[i] = o
  88. }
  89. return lines, nil
  90. }
  91. // Is implements OnlineRepository.
  92. func (repo *onlineRepository) Is(ctx context.Context, userId string) (bool, error) {
  93. keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", userId)).Result()
  94. if err != nil {
  95. if eris.Is(err, redis.Nil) {
  96. return false, nil
  97. }
  98. return false, eris.Wrap(err, "unable to get keys")
  99. }
  100. return len(keys) > 0, nil
  101. }
  102. // Heartbeat implements OnlineRepository.
  103. func (repo *onlineRepository) Heartbeat(ctx context.Context, o *models.Online) error {
  104. return repo.rdb.Expire(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), repo.HeartbeatWait).Err()
  105. }
  106. // Offline implements OnlineRepository.
  107. func (repo *onlineRepository) Offline(ctx context.Context, o *models.Online) error {
  108. return repo.rdb.Del(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId)).Err()
  109. }
  110. // SetOnline implements OnlineRepository.
  111. func (repo *onlineRepository) SetOnline(ctx context.Context, o *models.Online) error {
  112. return repo.rdb.Set(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), o, repo.HeartbeatWait).Err()
  113. }
  114. func NewOnlineRepository(rdb *redis.Client) OnlineRepository {
  115. return &onlineRepository{rdb: rdb, HeartbeatWait: config.Websocket.HeartbeatWait * time.Second}
  116. }