Browse Source

替换日志

luoyangwei 1 year ago
parent
commit
0f75b6fde6

+ 1 - 1
config/config.go

@@ -15,7 +15,7 @@ func loadWebsocketConfig() error {
 
 // MustLoadConfig 加载配置
 func MustLoadConfig(file string) {
-	confx.SetEnvConfig("NODE_ID", "ENVIRONMENT")
+	confx.SetEnvConfig("ENVIRONMENT")
 	if err := confx.LoadConfig(file, loadWebsocketConfig); err != nil {
 		log.Fatalln(err)
 	}

+ 0 - 1
etc/websocket.release.toml

@@ -1,4 +1,3 @@
-environment = "release"
 name = "websocket"
 port = 10082
 

+ 0 - 1
etc/websocket.test.toml

@@ -1,4 +1,3 @@
-environment = "test"
 name = "websocket"
 port = 10082
 

+ 0 - 1
etc/websocket.toml

@@ -1,4 +1,3 @@
-environment = "dev"
 name = "websocket"
 port = 10082
 

+ 1 - 1
go.mod

@@ -17,7 +17,7 @@ require (
 	github.com/rotisserie/eris v0.5.4
 	github.com/spf13/viper v1.18.2
 	gorm.io/gorm v1.25.7
-	x.sikey.com.cn/serverx v1.2.4
+	x.sikey.com.cn/serverx v1.2.6
 )
 
 require (

+ 4 - 0
go.sum

@@ -703,3 +703,7 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
 x.sikey.com.cn/serverx v1.2.4 h1:zoybAadI1SocSgvBn/RIicQkFjDwgunKcjh0rLqHC2E=
 x.sikey.com.cn/serverx v1.2.4/go.mod h1:BgkpsIbivTnhap0bDn7pxJxk7hkO5XUV78NixrNGZsw=
+x.sikey.com.cn/serverx v1.2.5 h1:t99BFhUuBl4s2+veS213sLVZpn+EyObiNIaOoW8Ncgw=
+x.sikey.com.cn/serverx v1.2.5/go.mod h1:BgkpsIbivTnhap0bDn7pxJxk7hkO5XUV78NixrNGZsw=
+x.sikey.com.cn/serverx v1.2.6 h1:Wib1PZtTPBS0kFdLZLFM9aedGHsDKPv3UZOob9DFY5o=
+x.sikey.com.cn/serverx v1.2.6/go.mod h1:BgkpsIbivTnhap0bDn7pxJxk7hkO5XUV78NixrNGZsw=

+ 3 - 3
repositories/online_repostiroy_test.go

@@ -4,10 +4,10 @@ import (
 	"context"
 	"testing"
 
+	"go.uber.org/zap"
 	"sikey.com/websocket/config"
 	"x.sikey.com.cn/serverx/dbx"
 	"x.sikey.com.cn/serverx/rdbx"
-	"x.sikey.com.cn/serverx/zlog"
 )
 
 func TestOnlineRepository_Is(t *testing.T) {
@@ -16,7 +16,7 @@ func TestOnlineRepository_Is(t *testing.T) {
 	var err error
 	var online bool
 	if online, err = repos.OnlineRepository.Is(context.TODO(), "2d2e78e8-eb61-47c5-8ebc-8e4d7313f577"); err != nil {
-		zlog.Error(err)
+		zap.S().Error(err)
 	}
-	zlog.Info("online: ", online)
+	zap.S().Infof("online: %s", online)
 }

+ 19 - 26
server/client.go

@@ -6,15 +6,13 @@ import (
 	"time"
 
 	"github.com/gin-gonic/gin"
-	"github.com/google/martian/v3/log"
 	"github.com/gorilla/websocket"
 	"github.com/mitchellh/mapstructure"
 	"github.com/rotisserie/eris"
+	"go.uber.org/zap"
 	"sikey.com/websocket/models"
 	"sikey.com/websocket/repositories"
-	"sikey.com/websocket/utils/keys"
 	"x.sikey.com.cn/serverx/gid"
-	"x.sikey.com.cn/serverx/zlog"
 )
 
 type Client struct {
@@ -42,31 +40,31 @@ type Client struct {
 
 func (c *Client) reader() {
 	defer func() {
+		zap.L().Info("client Offline", zap.String("user_id", c.UserId))
 		c.hub.Disconnect <- c
 		c.Close()
-		zlog.With(keys.WebsocketClient, c.UserId).Info("client Offline")
 	}()
 
 	c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
 	for {
 		_, bytes, err := c.UnderlyingConn.ReadMessage()
 		if err != nil {
-			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
-				zlog.With(keys.WebsocketClient, c.UserId).Errorf("error: %v", eris.Wrap(err, c.UserId))
-			} else {
-				zlog.With(keys.WebsocketClient, c.UserId).Errorf("error: %v", eris.Wrap(err, c.UserId))
-			}
+			// if websocket.IsUnexpectedCloseError(err,
+			// websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+			// 	log.Errorf("error: %v", eris.Wrap(err, c.UserId))
+			// }
+			zap.L().Error("read message error", zap.Error(err))
 			// Close connect
-			// _ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
+			_ = c.repos.OnlineRepository.Offline(c.ctx, c.online)
 			return
 		}
 
 		var message = deserializeMessage(bytes)
-		var log = zlog.With(keys.HeaderRequestId, message.RequestId).With(keys.WebsocketClient, c.UserId)
-
 		switch message.Type {
 		case MessageTypePingPong:
-			log.Debugf("receive ping message from %s", c.UserId)
+			zap.L().Debug("receive ping message",
+				zap.String("user_id", c.UserId),
+				zap.String("request_id", message.RequestId))
 			_ = c.repos.OnlineRepository.Heartbeat(c.ctx, c.online)
 		case MessageTypeUpChating, MessageTypeDownChating:
 			// Chat dialogue messages
@@ -83,20 +81,22 @@ func (c *Client) reader() {
 			}
 
 			// Receiver ID format determines whether the receiver is an account or a session
-			log.Infof("message receiver: %s", chatingContent.Receiver)
 			users := c.getReceiverUserIds(chatingContent.Receiver)
-			log.Infof("message to users: %v", users)
+			zap.L().Info("received message",
+				zap.String("user_id", c.UserId),
+				zap.String("receiver", chatingContent.Receiver),
+				zap.String("request_id", message.RequestId),
+				zap.Strings("users", users))
 			for _, id := range users {
 				var messaging = *message
 				messaging.Receiver = id
 				messaging.Content = chatingContent
-				log.Infof("Send message %s to %s", c.UserId, id)
 
 				// Check if the user is online
 				if c.firebaseToken != "" {
 					online, err := c.repos.OnlineRepository.Is(c.ctx, id)
 					if err != nil {
-						log.Error(eris.Wrap(err, "unable to find online user"))
+						zap.L().Error("unable to find online user", zap.Error(err))
 						continue
 					}
 					if !online {
@@ -140,11 +140,6 @@ func (c *Client) writer() {
 	for {
 		select {
 		case message, ok := <-c.Send:
-			// var log = c.logger
-			if message != nil {
-				// log = log.WithField(keys.HeaderRequestId, message.RequestId)
-			}
-
 			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
 			if !ok {
 				// The hub closed the channel.
@@ -170,12 +165,10 @@ func (c *Client) writer() {
 						if eris.Is(err, models.ErrRecordNotFound) {
 							break
 						}
-						// log.Error(err)
+						zap.L().Error("unable to find message", zap.Error(err))
 					}
 				}
 			}
-
-			log.Infof("write message %s", message)
 		case <-ticker.C:
 			// 到时间发送 ping 信号
 			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
@@ -237,7 +230,7 @@ func (c *Client) getReceiverUserIds(receiver string) []string {
 	members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
 		c.ctx, receiver, c.UserId)
 	if err != nil {
-		zlog.Error(eris.Wrap(err, "unable to get session members"))
+		zap.L().Error("unable to get session members", zap.Error(err))
 		return []string{}
 	}
 

+ 3 - 3
server/fcm.go

@@ -7,7 +7,7 @@ import (
 	firebase "firebase.google.com/go"
 	"firebase.google.com/go/messaging"
 	"github.com/rotisserie/eris"
-	"x.sikey.com.cn/serverx/zlog"
+	"go.uber.org/zap"
 )
 
 type FirebaseMessageServer struct {
@@ -42,10 +42,10 @@ func (f *FirebaseMessageServer) Send(ctx context.Context, token string, message
 	}
 	resultName, err := client.Send(ctx, messaging)
 	if err != nil {
-		zlog.Error("unable to send fcm message: ", err)
+		zap.L().Error("unable to send fcm message", zap.Error(err))
 		return eris.Wrap(err, "unable to send fcm message")
 	}
 
-	zlog.Infof("Successfully sent message: %s", resultName)
+	zap.L().Info("Successfully sent message: ", zap.String("resultName", resultName))
 	return nil
 }

+ 3 - 7
server/hub.go

@@ -4,8 +4,8 @@ import (
 	"context"
 	"sync"
 
+	"go.uber.org/zap"
 	"sikey.com/websocket/config"
-	"x.sikey.com.cn/serverx/zlog"
 )
 
 type Hub struct {
@@ -46,10 +46,6 @@ func (h *Hub) run() {
 		select {
 		case client := <-h.Connect:
 			h.mutex.Lock()
-			// if c, ok := h.clients[client.UserId]; ok {
-			// 	h.Disconnect <- c
-			// }
-
 			h.clients[client.UserId] = client
 			h.mutex.Unlock()
 			if h.Nats != nil {
@@ -65,7 +61,7 @@ func (h *Hub) run() {
 			}
 		case message := <-h.Message:
 			h.mutex.RLock()
-			zlog.Info("message: ", message)
+			zap.L().Info("message: ", zap.Any("message", message))
 			if client, ok := h.clients[message.Receiver]; ok {
 				client.Send <- message
 			} else {
@@ -74,7 +70,7 @@ func (h *Hub) run() {
 			h.mutex.RUnlock()
 		case firebaseMessage := <-h.FirebaseMessage:
 			if err := h.FCM.Send(context.Background(), firebaseMessage.token, firebaseMessage.message); err != nil {
-				zlog.Error("unable to send fcm message: ", err)
+				zap.L().Error("unable to send fcm message", zap.Error(err))
 			}
 		}
 

+ 6 - 6
server/nats.go

@@ -5,8 +5,7 @@ import (
 	"sync"
 
 	"github.com/nats-io/nats.go"
-	"github.com/rotisserie/eris"
-	"x.sikey.com.cn/serverx/zlog"
+	"go.uber.org/zap"
 )
 
 const (
@@ -54,7 +53,7 @@ func NewNats(addr string) *Nats {
 
 	_, err = nc.ChanSubscribe(subject, n.ch)
 	if err != nil {
-		zlog.Error(eris.Wrap(err, "unable to start: "))
+		zap.L().Error("unable to start", zap.Error(err))
 	}
 
 	go n.run()
@@ -71,19 +70,20 @@ func (n *Nats) run() {
 				Data:    bytes,
 				Header:  nats.Header{headerUserId: []string{nMsg.userId}},
 			}); err != nil {
-				zlog.Error(eris.Wrapf(err, "unable to message send: %s", nMsg.userId))
+				zap.L().Error("unable to message send", zap.Error(err), zap.String("user_id", nMsg.userId))
 			}
 		case msg := <-n.ch:
 			userIds := msg.Header.Values(headerUserId)
-			zlog.Debugf("received nats ids: %v", userIds)
+			zap.L().Debug("received nats message", zap.Strings("ids", userIds))
 			if len(userIds) < 1 {
 				continue
 			}
+
 			n.mutex.RLock()
 			for _, uid := range userIds {
 				if s, ok := n.Subscribers[uid]; ok {
 					message := deserializeMessage(msg.Data)
-					zlog.Debugf("[%s] message: %v", uid, string(msg.Data))
+					zap.L().Info("received nats message", zap.String("user_id", uid), zap.ByteString("message", msg.Data))
 					s.client.Send <- message
 				}
 			}

+ 5 - 7
server/server.go

@@ -10,11 +10,11 @@ import (
 	"github.com/gorilla/websocket"
 	"github.com/rotisserie/eris"
 	"github.com/spf13/viper"
+	"go.uber.org/zap"
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/models"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/utils/keys"
-	"x.sikey.com.cn/serverx/zlog"
 )
 
 type Server struct {
@@ -35,16 +35,15 @@ func (srv *Server) WebsocketHandler(ctx *gin.Context) {
 	// Validate token
 	id, ok, err := jwtParse(headers)
 	if !ok {
-		zlog.Error(err)
+		zap.L().Error("invalid token", zap.Error(err))
 		ctx.AbortWithError(http.StatusUnauthorized, err)
 		return
 	}
 
-	log := zlog.With("ClientId", id)
-	log.Info("client online")
+	zap.L().Info("client online", zap.String("user_id", id))
 	conn, err := srv.Upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
 	if err != nil {
-		zlog.Error(err)
+		zap.L().Error("upgrade error", zap.Error(err))
 		ctx.AbortWithError(http.StatusInternalServerError, err)
 		return
 	}
@@ -54,7 +53,7 @@ func (srv *Server) WebsocketHandler(ctx *gin.Context) {
 	if client = srv.Hub.getClientByUserId(id); client != nil {
 		_ = client.UnderlyingConn.Close()
 		client.UnderlyingConn = conn
-		log.Info("client reconnection")
+		zap.L().Info("client reconnection", zap.String("user_id", id))
 		return
 	}
 
@@ -130,7 +129,6 @@ func jwtParse(headers Headers) (string, bool, error) {
 	if ok {
 		return userClaims.UID, true, nil
 	} else {
-		zlog.Errorf("invalid token: %v", err)
 		return "", false, eris.Wrap(err, "token parse error")
 	}
 	// mapClaims := token.Claims.(jwt.MapClaims)