Jelajahi Sumber

客户端关闭日志

luoyangwei 1 tahun lalu
induk
melakukan
065ddd9f21
4 mengubah file dengan 13 tambahan dan 69 penghapusan
  1. 0 2
      server/hub.go
  2. 7 1
      server/server.go
  3. 5 66
      server/server_test.go
  4. 1 0
      websocket.go

+ 0 - 2
server/hub.go

@@ -110,7 +110,5 @@ func (h *Hub) run() {
 }
 
 func (h *Hub) GetClients() map[string]*Client {
-	h.mutex.RLock()
-	defer h.mutex.RUnlock()
 	return h.clients
 }

+ 7 - 1
server/server.go

@@ -63,7 +63,13 @@ func WebsocketHandler(ctx *gin.Context, srv *Server) {
 		repos: srv.Repositories,
 	}
 	srv.Hub.Connect <- client
-	zlog.Info("client online: ", client.UserId)
+	zlog.Info("client: ", client.UserId)
+
+	conn.SetCloseHandler(func(code int, text string) error {
+		zlog.Error("close client: ", client.UserId, code, text)
+		srv.Hub.Disconnect <- client
+		return nil
+	})
 
 	go client.reader()
 	go client.writer()

+ 5 - 66
server/server_test.go

@@ -2,11 +2,8 @@ package server
 
 import (
 	"fmt"
-	"github.com/rotisserie/eris"
 	"log"
 	"strconv"
-	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -21,71 +18,12 @@ func BenchmarkTest1(b *testing.B) {
 	}
 }
 
-func TestServer_WebsocketKeep(t *testing.T) {
-	wg := sync.WaitGroup{}
-
-	var count int32
-	var failed int32
-
-	go func() {
-		wg.Add(1)
-		defer wg.Done()
-		for {
-			time.Sleep(3 * time.Second)
-			fmt.Printf("%d, %d \n", atomic.LoadInt32(&count), atomic.LoadInt32(&failed))
-		}
-	}()
-
-	for i := 0; i < 3000; i++ {
-		go func() {
-			wg.Add(1)
-			defer wg.Done()
-
-			conn, _, err := websocket.DefaultDialer.Dial(
-				fmt.Sprintf(`ws://127.0.0.1:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
-				//fmt.Sprintf(`ws://106.75.230.4:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
-				nil)
-			if err != nil {
-				atomic.StoreInt32(&failed, failed+1)
-				log.Println(eris.Wrap(err, "unable to create connect"))
-				return
-			}
-
-			conn.SetCloseHandler(func(code int, text string) error {
-				log.Println(code, text)
-				return nil
-			})
-
-			go func() {
-				for {
-					time.Sleep(3 * time.Second)
-					message := Message{Type: MessageTypePingPong, RequestId: strconv.Itoa(int(time.Now().UnixMilli())), Content: "ping"}
-					err = conn.WriteMessage(websocket.TextMessage, serializationMessage(&message))
-					if err != nil {
-						log.Println(eris.Wrap(err, "unable to write message"))
-						return
-					}
-				}
-			}()
-
-			atomic.StoreInt32(&count, count+1)
-			for {
-				_, _, err = conn.ReadMessage()
-				if err != nil {
-					log.Println(eris.Wrap(err, "unable to read message"))
-					return
-				}
-			}
-		}()
-	}
-	wg.Wait()
-}
-
 func BenchmarkServer_WebsocketPressure(b *testing.B) {
 	b.ReportAllocs()
 	b.ResetTimer()
 
-	b.SetParallelism(17)
+	b.SetParallelism(10)
+	fmt.Println(b.N)
 	b.RunParallel(func(p *testing.PB) {
 		for p.Next() {
 			websocketConnect()
@@ -95,12 +33,13 @@ func BenchmarkServer_WebsocketPressure(b *testing.B) {
 
 func websocketConnect() {
 	conn, _, err := websocket.DefaultDialer.Dial(
-		// fmt.Sprintf(`ws://127.0.0.1:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
-		fmt.Sprintf(`ws://106.75.230.4:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
+		fmt.Sprintf(`ws://127.0.0.1:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
+		// fmt.Sprintf(`ws://106.75.230.4:10082/websocket/endpoint?X-Websocket-Header-ID=%s`, uuid.NewString()),
 		nil)
 	if err != nil {
 		log.Println(err)
 	}
+	defer conn.Close()
 
 	message := Message{Type: MessageTypePingPong, RequestId: strconv.Itoa(int(time.Now().UnixMilli())), Content: "ping"}
 	conn.SetWriteDeadline(time.Now().Add(time.Second * 60))

+ 1 - 0
websocket.go

@@ -60,6 +60,7 @@ func newApp() *gin.Engine {
 		Hub:          hub,
 		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql()),
 	}
+
 	app.GET("/websocket/endpoint", func(ctx *gin.Context) { server.WebsocketHandler(ctx, srv) })
 	app.GET("/index", func(ctx *gin.Context) {
 		clients := hub.GetClients()