ソースを参照

websocket 各种配置

luoyangwei 1 年間 前
コミット
574cd29344
9 ファイル変更61 行追加31 行削除
  1. 9 3
      config/config.go
  2. 14 0
      config/websocket.go
  3. 9 0
      etc/websocket.test.toml
  4. 9 0
      etc/websocket.toml
  5. 1 0
      server/client.go
  6. 8 11
      server/hub.go
  7. 4 0
      server/nats.go
  8. 5 8
      server/server.go
  9. 2 9
      websocket.go

+ 9 - 3
config/config.go

@@ -10,9 +10,10 @@ import (
 )
 
 var (
-	Config configx.Config
-	Kafka  kafka
-	Redis  redis
+	Config    configx.Config
+	Kafka     kafka
+	Redis     redis
+	Websocket websocket
 )
 
 func loadRestfulConfig() error {
@@ -27,12 +28,17 @@ func loadRedisConfig() error {
 	return viper.UnmarshalKey("redis", &Redis)
 }
 
+func loadWebsocketConfig() error {
+	return viper.UnmarshalKey("websocket", &Websocket)
+}
+
 // MustLoadConfig 加载配置
 func MustLoadConfig(file string) {
 	err := configx.LoadConfig(file,
 		loadRestfulConfig,
 		loadKafkaConfig,
 		loadRedisConfig,
+		loadWebsocketConfig,
 	)
 	if err != nil {
 		log.Fatalln(eris.Wrap(err, "无法映射配置"))

+ 14 - 0
config/websocket.go

@@ -0,0 +1,14 @@
+package config
+
+import "time"
+
+type websocket struct {
+	ConnectSize   int
+	MessageSize   int
+	HeartbeatWait time.Duration
+	ReadWait      time.Duration
+	WriteWait     time.Duration
+
+	ReadBufferSize  int
+	WriteBufferSize int
+}

+ 9 - 0
etc/websocket.test.toml

@@ -2,6 +2,15 @@ environment = "test"
 name = "websocket"
 port = 10082
 
+[websocket]
+connectSize = 1024 # 控制消息 Channel 大小
+heartbeatWait = 120 # 心跳超时
+messageSize = 256 # 消息 channel 大小
+readBufferSize = 1024 # 读消息体最大的 buffersize
+readWait = 10 # 写超时
+writeBufferSize = 1024 # 写消息体最大的 buffersize
+writeWait = 10 # 读超时
+
 [mysql]
 dsn = "root:9RKdJsEQKnjrni9R@tcp(10.23.148.10:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"
 # Ignore ErrRecordNotFound error for logger

+ 9 - 0
etc/websocket.toml

@@ -2,6 +2,15 @@ environment = "dev"
 name = "websocket"
 port = 10082
 
+[websocket]
+connectSize = 1024 # 控制消息 Channel 大小
+heartbeatWait = 120 # 心跳超时
+messageSize = 256 # 消息 channel 大小
+readBufferSize = 1024 # 读消息体最大的 buffersize
+readWait = 10 # 写超时
+writeBufferSize = 1024 # 写消息体最大的 buffersize
+writeWait = 10 # 读超时
+
 [mysql]
 dsn = "root:qq123123@tcp(127.0.0.1:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"
 # Ignore ErrRecordNotFound error for logger

+ 1 - 0
server/client.go

@@ -47,6 +47,7 @@ func (c *Client) reader() {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
 				zlog.Errorf("error: %v", err)
 			}
+			// Close connect
 			break
 		}
 

+ 8 - 11
server/hub.go

@@ -3,18 +3,15 @@ package server
 import (
 	"sync"
 
+	"sikey.com/websocket/config"
 	"sikey.com/websocket/utils/zlog"
 
 	"github.com/redis/go-redis/v9"
 )
 
 type HubConfig struct {
-	ConnectSize    int
-	DisconnectSize int
-	MessageSize    int
-	ServerId       string // serverId 服务器ID
-
-	Rdb redis.UniversalClient
+	ServerId string // serverId 服务器ID
+	Rdb      redis.UniversalClient
 }
 
 type Hub struct {
@@ -38,9 +35,9 @@ func NewHub(cfg HubConfig) *Hub {
 		clients: make(map[string]*Client),
 		mutex:   sync.RWMutex{},
 
-		Connect:    make(chan *Client, cfg.ConnectSize),
-		Disconnect: make(chan *Client, cfg.DisconnectSize),
-		Message:    make(chan *Message, cfg.MessageSize),
+		Connect:    make(chan *Client, config.Websocket.ConnectSize),
+		Disconnect: make(chan *Client, config.Websocket.ConnectSize),
+		Message:    make(chan *Message, config.Websocket.MessageSize),
 	}
 	hub.exchange = NewExchange(cfg.ServerId, cfg.Rdb)
 
@@ -55,7 +52,7 @@ func (h *Hub) run() {
 
 			h.mutex.Lock()
 			h.clients[client.UserId] = client
-			h.exchange.OnPublishConnect(client)
+			// h.exchange.OnPublishConnect(client)
 			h.mutex.Unlock()
 
 		case client := <-h.Disconnect:
@@ -63,7 +60,7 @@ func (h *Hub) run() {
 			h.mutex.Lock()
 			close(client.Send)
 			delete(h.clients, client.UserId)
-			h.exchange.OnPublishDisconnect(client)
+			// h.exchange.OnPublishDisconnect(client)
 			h.mutex.Unlock()
 
 		case message := <-h.Message:

+ 4 - 0
server/nats.go

@@ -0,0 +1,4 @@
+package server
+
+type Nats struct {
+}

+ 5 - 8
server/server.go

@@ -10,6 +10,7 @@ import (
 	"github.com/gorilla/websocket"
 	"github.com/rotisserie/eris"
 	"github.com/spf13/viper"
+	"sikey.com/websocket/config"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/utils/keys"
 	"sikey.com/websocket/utils/zlog"
@@ -21,10 +22,6 @@ type Server struct {
 
 	Upgrader websocket.Upgrader
 	Hub      *Hub
-
-	ReadWait  time.Duration
-	WriteWait time.Duration
-	PingWait  time.Duration
 }
 
 func WebsocketHandler(ctx *gin.Context, srv *Server) {
@@ -52,10 +49,10 @@ func WebsocketHandler(ctx *gin.Context, srv *Server) {
 		UserId:         id,
 		hub:            srv.Hub,
 		UnderlyingConn: conn,
-		Send:           make(chan *Message, 256),
-		writeWait:      srv.WriteWait,
-		readWait:       srv.ReadWait,
-		pingWait:       srv.PingWait,
+		Send:           make(chan *Message, config.Websocket.MessageSize),
+		writeWait:      config.Websocket.WriteWait * time.Second,
+		readWait:       config.Websocket.ReadWait * time.Second,
+		pingWait:       config.Websocket.HeartbeatWait * time.Second,
 
 		isSimpleMsg:  headers[keys.SimpleHeader].(bool),
 		localization: headers[keys.LocalizationHeader].(string),

+ 2 - 9
websocket.go

@@ -4,7 +4,6 @@ import (
 	"flag"
 	"fmt"
 	"net/http"
-	"time"
 
 	"github.com/DeanThompson/ginpprof"
 	"github.com/gin-gonic/gin"
@@ -42,21 +41,15 @@ func newApp() *gin.Engine {
 			Password: "sikey!Q@W#E456",
 			DB:       0,
 		}),
-		ConnectSize:    1024,
-		DisconnectSize: 1024,
-		MessageSize:    125,
 	})
 	srv := &server.Server{
 		Upgrader: websocket.Upgrader{
-			ReadBufferSize:  1024,
-			WriteBufferSize: 1024,
+			ReadBufferSize:  config.Websocket.ReadBufferSize,
+			WriteBufferSize: config.Websocket.WriteBufferSize,
 			CheckOrigin: func(r *http.Request) bool {
 				return true
 			},
 		},
-		WriteWait:    10 * time.Second,
-		ReadWait:     10 * time.Second,
-		PingWait:     120 * time.Second,
 		Hub:          hub,
 		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql()),
 	}