luoyangwei 1 жил өмнө
parent
commit
1acc1bcfb7

+ 24 - 0
models/online.go

@@ -0,0 +1,24 @@
+package models
+
+import (
+	"encoding"
+	"encoding/json"
+)
+
+var _ encoding.BinaryMarshaler = (*Online)(nil)
+var _ encoding.BinaryUnmarshaler = (*Online)(nil)
+
+type Online struct {
+	UserId   string
+	ServerId string
+}
+
+// UnmarshalBinary implements encoding.BinaryUnmarshaler.
+func (o *Online) UnmarshalBinary(data []byte) error {
+	return json.Unmarshal(data, o)
+}
+
+// MarshalBinary implements encoding.BinaryMarshaler.
+func (o *Online) MarshalBinary() (data []byte, err error) {
+	return json.Marshal(o)
+}

+ 34 - 0
repositories/online_repostiroy.go

@@ -0,0 +1,34 @@
+package repositories
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/redis/go-redis/v9"
+	"sikey.com/websocket/models"
+)
+
+var _ OnlineRepository = (*onlineRepository)(nil)
+
+type OnlineRepository interface {
+	SetOnline(ctx context.Context, o *models.Online) error
+	Offline(ctx context.Context, o *models.Online) error
+}
+
+type onlineRepository struct {
+	rdb *redis.Client
+}
+
+// Offline implements OnlineRepository.
+func (repo *onlineRepository) Offline(ctx context.Context, o *models.Online) error {
+	return repo.rdb.Del(ctx, fmt.Sprintf("clients.%s.%s", o.ServerId, o.UserId)).Err()
+}
+
+// SetOnline implements OnlineRepository.
+func (repo *onlineRepository) SetOnline(ctx context.Context, o *models.Online) error {
+	return repo.rdb.Set(ctx, fmt.Sprintf("clients.%s.%s", o.ServerId, o.UserId), o, 0).Err()
+}
+
+func NewOnlineRepository(rdb *redis.Client) OnlineRepository {
+	return &onlineRepository{rdb: rdb}
+}

+ 4 - 1
repositories/repositories.go

@@ -3,6 +3,7 @@ package repositories
 import (
 	"context"
 
+	"github.com/redis/go-redis/v9"
 	"gorm.io/gorm"
 )
 
@@ -10,10 +11,12 @@ type TransactionFun func(ctx context.Context, repos *Repositories) error
 
 type Repositories struct {
 	SessionRepository SessionRepository
+	OnlineRepository  OnlineRepository
 }
 
-func NewRepositories(source *gorm.DB) *Repositories {
+func NewRepositories(source *gorm.DB, rdb *redis.Client) *Repositories {
 	return &Repositories{
 		SessionRepository: NewSessionRepository(source),
+		OnlineRepository:  NewOnlineRepository(rdb),
 	}
 }

+ 9 - 2
server/client.go

@@ -37,7 +37,7 @@ type Client struct {
 func (c *Client) reader() {
 	defer func() {
 		c.hub.Disconnect <- c
-		c.UnderlyingConn.Close()
+		c.Close()
 	}()
 
 	c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
@@ -98,7 +98,7 @@ func (c *Client) writer() {
 	ticker := time.NewTicker(c.pingWait)
 	defer func() {
 		ticker.Stop()
-		c.UnderlyingConn.Close()
+		c.Close()
 	}()
 
 	for {
@@ -169,3 +169,10 @@ func (c *Client) getReceiverUserIds(receiver string) []string {
 	}
 	return ms
 }
+
+// Close websocket connection
+func (c *Client) Close() {
+	c.UnderlyingConn.Close()
+	online := &models.Online{UserId: c.UserId, ServerId: c.hub.serverId}
+	c.repos.OnlineRepository.Offline(c.ctx, online)
+}

+ 3 - 15
server/hub.go

@@ -5,32 +5,21 @@ import (
 
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/utils/zlog"
-
-	"github.com/redis/go-redis/v9"
 )
 
-type HubConfig struct {
-	ServerId string // serverId 服务器ID
-	Rdb      redis.UniversalClient
-}
-
 type Hub struct {
 	serverId string // serverId 服务器ID
-	rdb      redis.UniversalClient
 
 	clients    map[string]*Client
 	mutex      sync.RWMutex
 	Connect    chan *Client
 	Disconnect chan *Client
-
-	Message chan *Message
-
-	exchange *Exchange
+	Message    chan *Message
 }
 
-func NewHub(cfg HubConfig) *Hub {
+func NewHub(serverId string) *Hub {
 	hub := &Hub{
-		serverId: cfg.ServerId,
+		serverId: serverId,
 
 		clients: make(map[string]*Client),
 		mutex:   sync.RWMutex{},
@@ -39,7 +28,6 @@ func NewHub(cfg HubConfig) *Hub {
 		Disconnect: make(chan *Client, config.Websocket.ConnectSize),
 		Message:    make(chan *Message, config.Websocket.MessageSize),
 	}
-	hub.exchange = NewExchange(cfg.ServerId, cfg.Rdb)
 
 	go hub.run()
 	return hub

+ 4 - 0
server/nats.go

@@ -2,3 +2,7 @@ package server
 
 type Nats struct {
 }
+
+func NewNats() *Nats {
+	return &Nats{}
+}

+ 12 - 2
server/server.go

@@ -11,12 +11,14 @@ import (
 	"github.com/rotisserie/eris"
 	"github.com/spf13/viper"
 	"sikey.com/websocket/config"
+	"sikey.com/websocket/models"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/utils/keys"
 	"sikey.com/websocket/utils/zlog"
 )
 
 type Server struct {
+	ID           string
 	Ctx          *gin.Context
 	Repositories *repositories.Repositories
 
@@ -24,7 +26,7 @@ type Server struct {
 	Hub      *Hub
 }
 
-func WebsocketHandler(ctx *gin.Context, srv *Server) {
+func (srv *Server) WebsocketHandler(ctx *gin.Context) {
 	srv.Ctx = ctx
 
 	conn, err := srv.Upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
@@ -62,7 +64,15 @@ func WebsocketHandler(ctx *gin.Context, srv *Server) {
 		repos: srv.Repositories,
 	}
 	srv.Hub.Connect <- client
-	zlog.Info("client: ", client.UserId)
+	zlog.Debugf("client: %s", client.UserId)
+
+	// Online status to redis
+	online := &models.Online{UserId: client.UserId, ServerId: srv.ID}
+	if err := srv.Repositories.OnlineRepository.SetOnline(ctx, online); err != nil {
+		ctx.AbortWithError(http.StatusInternalServerError,
+			eris.Wrapf(err, "unable to set online status for user: %s", client.UserId))
+		return
+	}
 
 	go client.reader()
 	go client.writer()

+ 6 - 12
websocket.go

@@ -9,11 +9,11 @@ import (
 	"github.com/gin-gonic/gin"
 	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
-	"github.com/redis/go-redis/v9"
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/server"
 	"sikey.com/websocket/utils/mysqlx"
+	"sikey.com/websocket/utils/redisx"
 	"sikey.com/websocket/utils/zlog"
 )
 
@@ -34,15 +34,9 @@ func newApp() *gin.Engine {
 	app := gin.Default()
 	ginpprof.Wrap(app)
 
-	hub := server.NewHub(server.HubConfig{
-		ServerId: uuid.NewString(),
-		Rdb: redis.NewUniversalClient(&redis.UniversalOptions{
-			Addrs:    []string{"106.75.230.4:6379"},
-			Password: "sikey!Q@W#E456",
-			DB:       0,
-		}),
-	})
+	id := uuid.NewString()
 	srv := &server.Server{
+		ID: id,
 		Upgrader: websocket.Upgrader{
 			ReadBufferSize:  config.Websocket.ReadBufferSize,
 			WriteBufferSize: config.Websocket.WriteBufferSize,
@@ -50,10 +44,10 @@ func newApp() *gin.Engine {
 				return true
 			},
 		},
-		Hub:          hub,
-		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql()),
+		Hub:          server.NewHub(id),
+		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql(), redisx.RedisConnect()),
 	}
 
-	app.GET("/websocket/endpoint", func(ctx *gin.Context) { server.WebsocketHandler(ctx, srv) })
+	app.GET("/websocket/endpoint", func(ctx *gin.Context) { srv.WebsocketHandler(ctx) })
 	return app
 }