Browse Source

上线消息

luoyangwei 1 năm trước cách đây
mục cha
commit
be5b8b5269
2 tập tin đã thay đổi với 24 bổ sung12 xóa
  1. 1 1
      etc/websocket.debug.yaml
  2. 23 11
      repositories/online_repostiroy.go

+ 1 - 1
etc/websocket.debug.yaml

@@ -51,7 +51,7 @@ redis:
     addr: "106.75.230.4:6379"
     # REDIS_CHANNEL: 'message'
     connect_key: "connects"
-    db: 0
+    db: 1
     password: "sikey!Q@W#E456"
 
 # nats connection config

+ 23 - 11
repositories/online_repostiroy.go

@@ -2,7 +2,9 @@ package repositories
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
+	"strconv"
 	"time"
 
 	"github.com/redis/go-redis/v9"
@@ -19,7 +21,7 @@ type OnlineRepository interface {
 	Offline(ctx context.Context, o *models.Online) error
 	Heartbeat(ctx context.Context, o *models.Online) error
 
-	GetOfflineNotification(ctx context.Context, uid string) (map[int]*models.Notify, error)
+	GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error)
 	DelOfflineNotification(ctx context.Context, seq int64, uid string) error
 
 	// Is 是否在线
@@ -41,24 +43,34 @@ func (repo *onlineRepository) DelOfflineNotification(ctx context.Context, seq in
 }
 
 // GetOfflineNotification implements OnlineRepository.
-func (repo *onlineRepository) GetOfflineNotification(ctx context.Context, uid string) (map[int]*models.Notify, error) {
+func (repo *onlineRepository) GetOfflineNotification(ctx context.Context, uid string) (map[int]models.Notify, error) {
 	var messageKey string = fmt.Sprintf("clients.offline_notification.%s", uid)
 	var err error
-	notification := make(map[int]*models.Notify)
+	var notification = make(map[int]models.Notify)
 
 	// err = repo.sou
-	err = repo.rdb.HGetAll(ctx, messageKey).Scan(&notification)
+	result, err := repo.rdb.HGetAll(ctx, messageKey).Result()
 	if err != nil {
 		if eris.Is(err, redis.Nil) {
-			return make(map[int]*models.Notify), nil
+			return make(map[int]models.Notify), nil
 		}
 	}
+
+	for seqStr, notificationStr := range result {
+		seq, _ := strconv.ParseInt(seqStr, 10, 64)
+
+		var notify models.Notify
+		_ = json.Unmarshal([]byte(notificationStr), &notify)
+
+		notification[int(seq)] = notify
+	}
+
 	return notification, nil
 }
 
 // GetOnline implements OnlineRepository.
 func (repo *onlineRepository) GetOnline(ctx context.Context, uid string) (*models.Online, error) {
-	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.*.%s", uid)).Result()
+	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", uid)).Result()
 	if err != nil {
 		if eris.Is(err, redis.Nil) {
 			return nil, nil
@@ -80,7 +92,7 @@ func (repo *onlineRepository) GetOnline(ctx context.Context, uid string) (*model
 
 // GetServerClients implements OnlineRepository.
 func (repo *onlineRepository) GetServerClients(ctx context.Context, serverId string) ([]models.Online, error) {
-	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.%s.*", serverId)).Result()
+	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.%s.*", serverId)).Result()
 	if err != nil {
 		if eris.Is(err, redis.Nil) {
 			return make([]models.Online, 0), nil
@@ -99,7 +111,7 @@ func (repo *onlineRepository) GetServerClients(ctx context.Context, serverId str
 
 // Is implements OnlineRepository.
 func (repo *onlineRepository) Is(ctx context.Context, userId string) (bool, error) {
-	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.*.%s", userId)).Result()
+	keys, err := repo.rdb.Keys(ctx, fmt.Sprintf("clients.online.*.%s", userId)).Result()
 	if err != nil {
 		if eris.Is(err, redis.Nil) {
 			return false, nil
@@ -111,17 +123,17 @@ func (repo *onlineRepository) Is(ctx context.Context, userId string) (bool, erro
 
 // Heartbeat implements OnlineRepository.
 func (repo *onlineRepository) Heartbeat(ctx context.Context, o *models.Online) error {
-	return repo.rdb.Expire(ctx, fmt.Sprintf("clients.%s.%s", o.ServerId, o.UserId), repo.HeartbeatWait).Err()
+	return repo.rdb.Expire(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), repo.HeartbeatWait).Err()
 }
 
 // Offline implements OnlineRepository.
 func (repo *onlineRepository) Offline(ctx context.Context, o *models.Online) error {
-	return repo.rdb.Del(ctx, fmt.Sprintf("clients.%s.%s", o.ServerId, o.UserId)).Err()
+	return repo.rdb.Del(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId)).Err()
 }
 
 // SetOnline implements OnlineRepository.
 func (repo *onlineRepository) SetOnline(ctx context.Context, o *models.Online) error {
-	return repo.rdb.Set(ctx, fmt.Sprintf("clients.%s.%s", o.ServerId, o.UserId), o, repo.HeartbeatWait).Err()
+	return repo.rdb.Set(ctx, fmt.Sprintf("clients.online.%s.%s", o.ServerId, o.UserId), o, repo.HeartbeatWait).Err()
 }
 
 func NewOnlineRepository(rdb *redis.Client) OnlineRepository {