123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package repositories
- import (
- "context"
- "encoding/json"
- "fmt"
- "strconv"
- "time"
- "github.com/redis/go-redis/v9"
- "github.com/rotisserie/eris"
- "sikey.com/websocket/config"
- "sikey.com/websocket/models"
- )
- var _ OnlineRepository = (*onlineRepository)(nil)
- type OnlineRepository interface {
- SetOnline(ctx context.Context, o *models.Online) error
- GetOnline(ctx context.Context, uid string) (*models.Online, error)
- Offline(ctx context.Context, o *models.Online) error
- Heartbeat(ctx context.Context, o *models.Online) error
- GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error)
- DelOfflineNotification(ctx context.Context, seq int64, uid string) error
- // Is 是否在线
- Is(ctx context.Context, userId string) (bool, error)
- // GetServerClients 获取指定服务在线的用户
- GetServerClients(ctx context.Context, serverId string) ([]models.Online, error)
- }
- type onlineRepository struct {
- rdb *redis.Client
- HeartbeatWait time.Duration
- }
- // DelOfflineNotification implements OnlineRepository.
- func (repo *onlineRepository) DelOfflineNotification(ctx context.Context, seq int64, uid string) error {
- var messageKey string = fmt.Sprintf("clients.offline_notification.%s", uid)
- return repo.rdb.HDel(ctx, messageKey, fmt.Sprintf("%d", seq)).Err()
- }
- // GetOfflineNotification implements OnlineRepository.
- func (repo *onlineRepository) GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error) {
- var messageKey string = fmt.Sprintf("clients.offline_notification.%s", uid)
- var err error
- var notification = make(map[int]models.Notify)
- // err = repo.sou
- result, err := repo.rdb.HGetAll(ctx, messageKey).Result()
- if err != nil {
- if eris.Is(err, redis.Nil) {
- return make(map[int]models.Notify), nil
- }
- }
- for seqStr, notificationStr := range result {
- seq, _ := strconv.ParseInt(seqStr, 10, 64)
- var notify models.Notify
- _ = json.Unmarshal([]byte(notificationStr), ¬ify)
- notification[int(seq)] = notify
- }
- return notification, nil
- }
- // GetOnline implements OnlineRepository.
- func (repo *onlineRepository) GetOnline(ctx context.Context, uid string) (*models.Online, error) {
- keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", uid)).Result()
- if err != nil {
- if eris.Is(err, redis.Nil) {
- return nil, nil
- }
- return nil, eris.Wrap(err, "unable to get keys")
- }
- // 空的集合
- if len(keys) == 0 {
- return nil, nil
- }
- var o models.Online
- if err := repo.rdb.Get(ctx, keys[0]).Scan(&o); err != nil {
- return nil, err
- }
- return &o, nil
- }
- // GetServerClients implements OnlineRepository.
- func (repo *onlineRepository) GetServerClients(ctx context.Context, serverId string) ([]models.Online, error) {
- keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.%s.*", serverId)).Result()
- if err != nil {
- if eris.Is(err, redis.Nil) {
- return make([]models.Online, 0), nil
- }
- return nil, eris.Wrap(err, "unable to get keys")
- }
- var lines = make([]models.Online, len(keys))
- for i, key := range keys {
- var o models.Online
- repo.rdb.Get(ctx, key).Scan(&o)
- lines[i] = o
- }
- return lines, nil
- }
- // Is implements OnlineRepository.
- func (repo *onlineRepository) Is(ctx context.Context, userId string) (bool, error) {
- keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", userId)).Result()
- if err != nil {
- if eris.Is(err, redis.Nil) {
- return false, nil
- }
- return false, eris.Wrap(err, "unable to get keys")
- }
- return len(keys) > 0, nil
- }
- // Heartbeat implements OnlineRepository.
- func (repo *onlineRepository) Heartbeat(ctx context.Context, o *models.Online) error {
- return repo.rdb.Expire(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), repo.HeartbeatWait).Err()
- }
- // Offline implements OnlineRepository.
- func (repo *onlineRepository) Offline(ctx context.Context, o *models.Online) error {
- return repo.rdb.Del(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId)).Err()
- }
- // SetOnline implements OnlineRepository.
- func (repo *onlineRepository) SetOnline(ctx context.Context, o *models.Online) error {
- return repo.rdb.Set(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), o, repo.HeartbeatWait).Err()
- }
- func NewOnlineRepository(rdb *redis.Client) OnlineRepository {
- return &onlineRepository{rdb: rdb, HeartbeatWait: config.Websocket.HeartbeatWait * time.Second}
- }
|