luoyangwei 1 年間 前
コミット
46d1b56224

+ 22 - 0
.gitignore

@@ -0,0 +1,22 @@
+.fleet
+.idea
+.tmp
+.vscode
+tmp
+
+sikey-backend-healthy.iml
+
+.DS_Store
+*.txt
+
+./logs
+cache
+logs
+
+.run
+
+__debug__bin*
+__debug_bin*
+
+go.work
+go.work.sum

+ 50 - 0
config/config.go

@@ -0,0 +1,50 @@
+package config
+
+import (
+	"log"
+
+	"github.com/rotisserie/eris"
+	"github.com/spf13/viper"
+	"sikey.com/websocket/utils/configx"
+	"sikey.com/websocket/utils/zlog"
+)
+
+var (
+	Config configx.Config
+	Kafka  kafka
+)
+
+func loadRestfulConfig() error {
+	return viper.Unmarshal(&Config)
+}
+
+func loadKafkaConfig() error {
+	return viper.UnmarshalKey("kafka", &Kafka)
+}
+
+// MustLoadConfig 加载配置
+func MustLoadConfig(file string) {
+	err := configx.LoadConfig(file,
+		loadRestfulConfig,
+		loadKafkaConfig,
+	)
+	if err != nil {
+		log.Fatalln(eris.Wrap(err, "无法映射配置"))
+	}
+	log.Printf("Load configs [ toml ] from path [%s] \n", file)
+}
+
+// MustLoadLogger 初始化日志
+func MustLoadLogger() zlog.Writer {
+	var (
+		name = Config.Name
+		env  = Config.Environment
+		path = viper.GetString("log.path")
+	)
+
+	writer, err := zlog.NewZeroWriter(name, path, env)
+	if err != nil {
+		panic(err)
+	}
+	return writer
+}

+ 13 - 0
config/kafka.go

@@ -0,0 +1,13 @@
+package config
+
+import "time"
+
+type kafka struct {
+	Brokers   []string // Brokers kafka 集群
+	Topic     string   // Topic 主题
+	Partition int      // Partition po
+	MaxBytes  int      // MaxBytes 最大字节数
+
+	Timeout   time.Duration // Timeout 超时时间
+	DualStack bool          // DualStack 双栈
+}

+ 29 - 0
etc/websocket.test.toml

@@ -0,0 +1,29 @@
+environment = "test"
+name = "socket"
+port = 10082
+
+[mysql]
+dsn = "root:9RKdJsEQKnjrni9R@tcp(10.23.148.10:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"
+# Ignore ErrRecordNotFound error for logger
+skipDefaultTransaction = true
+# Slow SQL threshold
+slowThreshold = 600
+# Ignore ErrRecordNotFound error for logger
+ignoreRecordNotFoundError = true
+# 设置空闲连接池中连接的最大数量
+maxIdleConns = 10
+# 设置连接的有效时长 当 <= 0 时,连接永久保存,默认值时 0 。如果设置了 maxLifetime 会开启连接自动清理,
+# 清理的代码在 connectionCleaner 中, 它开启一个定时器,定时检查空闲连接池中的连接,超期的关闭连接。
+maxLifetime = -1
+# 设置打开数据库连接的最大数量。
+maxOpenConns = 100
+
+[redis]
+addr = "106.75.230.4:6379"
+channel = "message"
+connectKey = "connects"
+db = 0
+password = "sikey!Q@W#E456"
+
+[log]
+path = "/var/logs/sikey/socket"

+ 40 - 0
etc/websocket.toml

@@ -0,0 +1,40 @@
+environment = "dev"
+name = "websocket"
+port = 10082
+
+[mysql]
+dsn = "root:qq123123@tcp(127.0.0.1:3306)/sikey?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai"
+# Ignore ErrRecordNotFound error for logger
+skipDefaultTransaction = true
+# Slow SQL threshold
+slowThreshold = 600
+# Ignore ErrRecordNotFound error for logger
+ignoreRecordNotFoundError = true
+# 设置空闲连接池中连接的最大数量
+maxIdleConns = 10
+# 设置连接的有效时长 当 <= 0 时,连接永久保存,默认值时 0 。如果设置了 maxLifetime 会开启连接自动清理,
+# 清理的代码在 connectionCleaner 中, 它开启一个定时器,定时检查空闲连接池中的连接,超期的关闭连接。
+maxLifetime = -1
+# 设置打开数据库连接的最大数量。
+maxOpenConns = 100
+
+[redis]
+addr = "106.75.230.4:6379"
+channel = "message"
+connectKey = "connects"
+db = 0
+password = "sikey!Q@W#E456"
+
+[kafka]
+brokers = ["106.75.230.4:9092"]
+dualStack = true
+maxBytes = 10e6
+partition = 0
+timeout = 10
+topic = "messaging-channel"
+
+[log]
+path = "/Users/luoyangwei/logs/sikey/websocket"
+
+[auth]
+secret = "992443c835c347d6a8b7d046d0261671"

+ 65 - 0
go.mod

@@ -0,0 +1,65 @@
+module sikey.com/websocket
+
+go 1.21.4
+
+require (
+	github.com/gin-gonic/gin v1.9.1
+	github.com/golang-jwt/jwt/v5 v5.2.0
+	github.com/gorilla/websocket v1.5.1
+	github.com/mitchellh/mapstructure v1.5.0
+	github.com/redis/go-redis/v9 v9.4.0
+	github.com/rotisserie/eris v0.5.4
+	github.com/segmentio/kafka-go v0.4.47
+	github.com/spf13/viper v1.18.2
+	go.uber.org/zap v1.26.0
+	gorm.io/driver/mysql v1.5.2
+	gorm.io/gorm v1.25.5
+)
+
+require (
+	github.com/bytedance/sonic v1.10.2 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
+	github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
+	github.com/chenzhuoyu/iasm v0.9.1 // indirect
+	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+	github.com/fsnotify/fsnotify v1.7.0 // indirect
+	github.com/gabriel-vasile/mimetype v1.4.3 // indirect
+	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/go-playground/locales v0.14.1 // indirect
+	github.com/go-playground/universal-translator v0.18.1 // indirect
+	github.com/go-playground/validator/v10 v10.17.0 // indirect
+	github.com/go-sql-driver/mysql v1.7.0 // indirect
+	github.com/goccy/go-json v0.10.2 // indirect
+	github.com/hashicorp/hcl v1.0.0 // indirect
+	github.com/jinzhu/inflection v1.0.0 // indirect
+	github.com/jinzhu/now v1.1.5 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/klauspost/compress v1.17.0 // indirect
+	github.com/klauspost/cpuid/v2 v2.2.6 // indirect
+	github.com/leodido/go-urn v1.2.4 // indirect
+	github.com/magiconair/properties v1.8.7 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/pelletier/go-toml/v2 v2.1.1 // indirect
+	github.com/pierrec/lz4/v4 v4.1.15 // indirect
+	github.com/sagikazarmark/locafero v0.4.0 // indirect
+	github.com/sagikazarmark/slog-shim v0.1.0 // indirect
+	github.com/sourcegraph/conc v0.3.0 // indirect
+	github.com/spf13/afero v1.11.0 // indirect
+	github.com/spf13/cast v1.6.0 // indirect
+	github.com/spf13/pflag v1.0.5 // indirect
+	github.com/subosito/gotenv v1.6.0 // indirect
+	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+	github.com/ugorji/go/codec v1.2.12 // indirect
+	go.uber.org/multierr v1.10.0 // indirect
+	golang.org/x/arch v0.7.0 // indirect
+	golang.org/x/crypto v0.18.0 // indirect
+	golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
+	golang.org/x/net v0.20.0 // indirect
+	golang.org/x/sys v0.16.0 // indirect
+	golang.org/x/text v0.14.0 // indirect
+	google.golang.org/protobuf v1.32.0 // indirect
+	gopkg.in/ini.v1 v1.67.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 213 - 0
go.sum

@@ -0,0 +1,213 @@
+github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
+github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
+github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
+github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
+github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
+github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
+github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE=
+github.com/bytedance/sonic v1.10.2/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
+github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
+github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d h1:77cEq6EriyTZ0g/qfRdp61a3Uu/AWrgIq2s0ClJV1g0=
+github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d/go.mod h1:8EPpVsBuRksnlj1mLy4AWzRNQYxauNi62uWcE3to6eA=
+github.com/chenzhuoyu/iasm v0.9.0/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
+github.com/chenzhuoyu/iasm v0.9.1 h1:tUHQJXo3NhBqw6s33wkGn9SP3bvrWLdlVIJ3hQBL7P0=
+github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLIrkAmYog=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
+github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
+github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
+github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
+github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
+github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
+github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
+github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
+github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
+github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
+github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
+github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
+github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
+github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
+github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
+github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
+github.com/go-playground/validator/v10 v10.17.0 h1:SmVVlfAOtlZncTxRuinDPomC2DkXJ4E5T9gDA0AIH74=
+github.com/go-playground/validator/v10 v10.17.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
+github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
+github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
+github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
+github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw=
+github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
+github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
+github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
+github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
+github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
+github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
+github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
+github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
+github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
+github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
+github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rotisserie/eris v0.5.4 h1:Il6IvLdAapsMhvuOahHWiBnl1G++Q0/L5UIkI5mARSk=
+github.com/rotisserie/eris v0.5.4/go.mod h1:Z/kgYTJiJtocxCbFfvRmO+QejApzG6zpyky9G1A4g9s=
+github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
+github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
+github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
+github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
+github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
+github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
+github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
+github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
+github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
+github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
+github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
+github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
+github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
+github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
+github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
+github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
+github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
+go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
+go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
+go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
+go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
+go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
+golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
+golang.org/x/arch v0.7.0 h1:pskyeJh/3AmoQ8CPE95vxHLqp1G1GfGNXTmcl9NEKTc=
+golang.org/x/arch v0.7.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
+golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
+golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
+golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
+golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
+golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
+google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
+gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
+gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
+gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
+gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
+gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
+nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
+rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

+ 37 - 0
models/bool.go

@@ -0,0 +1,37 @@
+package models
+
+import (
+	"database/sql"
+	"database/sql/driver"
+
+	"github.com/rotisserie/eris"
+)
+
+func NewBool(b bool) sql.NullBool {
+	return sql.NullBool{Bool: b, Valid: true}
+}
+
+// BitBool is an implementation of a bool for the MySQL type BIT(1).
+// This type allows you to avoid wasting an entire byte for MySQL's boolean type TINYINT.
+type BitBool bool
+
+// Value implements the driver.Valuer interface,
+// and turns the BitBool into a bitfield (BIT(1)) for MySQL storage.
+func (b BitBool) Value() (driver.Value, error) {
+	if b {
+		return []byte{1}, nil
+	} else {
+		return []byte{0}, nil
+	}
+}
+
+// Scan implements the sql.Scanner interface,
+// and turns the bitfield incoming from MySQL into a BitBool
+func (b *BitBool) Scan(src interface{}) error {
+	v, ok := src.([]byte)
+	if !ok {
+		return eris.New("bad []byte type assertion")
+	}
+	*b = v[0] == 1
+	return nil
+}

+ 58 - 0
models/session.go

@@ -0,0 +1,58 @@
+package models
+
+import (
+	"time"
+
+	"gorm.io/gorm"
+)
+
+type Session struct {
+	ID        string `gorm:"primarykey"`
+	CreatedAt time.Time
+	UpdatedAt time.Time
+	DeletedAt gorm.DeletedAt `gorm:"index"`
+
+	Name         string
+	MembersCount int
+	Type         string
+	CreatedBy    string
+}
+
+type SessionMember struct {
+	ID        string `gorm:"primarykey"`
+	CreatedAt time.Time
+	UpdatedAt time.Time
+	DeletedAt gorm.DeletedAt `gorm:"index"`
+
+	SessionId       string
+	AccountId       string
+	AccountIdentity string
+	JoinTime        time.Time
+}
+
+type SessionMessage struct {
+	ID        string `gorm:"primarykey"`
+	CreatedAt time.Time
+	UpdatedAt time.Time
+	DeletedAt gorm.DeletedAt `gorm:"index"`
+
+	SessionId   string
+	Receiver    string
+	Sender      string
+	Type        uint8
+	ContentType uint8
+	Content     string
+	IsRead      BitBool
+}
+
+func (*Session) TableName() string {
+	return "tb_session"
+}
+
+func (*SessionMember) TableName() string {
+	return "tb_session_member"
+}
+
+func (*SessionMessage) TableName() string {
+	return "tb_session_message"
+}

+ 19 - 0
repositories/repositories.go

@@ -0,0 +1,19 @@
+package repositories
+
+import (
+	"context"
+
+	"gorm.io/gorm"
+)
+
+type TransactionFun func(ctx context.Context, repos *Repositories) error
+
+type Repositories struct {
+	SessionRepository SessionRepository
+}
+
+func NewRepositories(source *gorm.DB) *Repositories {
+	return &Repositories{
+		SessionRepository: NewSessionRepository(source),
+	}
+}

+ 99 - 0
repositories/session_repositroy.go

@@ -0,0 +1,99 @@
+package repositories
+
+import (
+	"context"
+
+	"github.com/rotisserie/eris"
+	"gorm.io/gorm"
+	"sikey.com/websocket/models"
+)
+
+var _ SessionRepository = (*sessionRepository)(nil)
+
+type SessionRepository interface {
+	sessionMemberRepository
+	sessionMessageRepository
+
+	// GetJoinedSessions 获取加入的 session
+	GetJoinedSessions(ctx context.Context, accountId string) ([]models.Session, error)
+}
+
+type sessionMemberRepository interface {
+
+	// GetSessionMembers 获取 session 下的成员信息
+	GetSessionMembers(ctx context.Context, sessionId string) ([]models.SessionMember, error)
+
+	// GetSessionMembersRemoveOneself 获取 session 下的成员信息,排除自己
+	GetSessionMembersRemoveOneself(ctx context.Context, sessionId, accountId string) ([]models.SessionMember, error)
+}
+
+type sessionMessageRepository interface {
+
+	// CreateMessage 存消息
+	CreateMessage(ctx context.Context, message *models.SessionMessage) error
+}
+
+type sessionRepository struct {
+	source *gorm.DB
+}
+
+// CreateMessage implements SessionRepository.
+func (repo *sessionRepository) CreateMessage(ctx context.Context, message *models.SessionMessage) error {
+	return repo.source.WithContext(ctx).Create(message).Error
+}
+
+// GetSessionMembersRemoveOneself implements SessionRepository.
+func (repo *sessionRepository) GetSessionMembersRemoveOneself(ctx context.Context, sessionId string, accountId string) ([]models.SessionMember, error) {
+	var err error
+	var members []models.SessionMember
+	err = repo.source.WithContext(ctx).
+		Where(&models.SessionMember{SessionId: sessionId}).
+		Not(&models.SessionMember{AccountId: accountId}).
+		Find(&members).Error
+	switch {
+	case err == nil:
+		return members, nil
+	case eris.Is(err, gorm.ErrRecordNotFound):
+		return make([]models.SessionMember, 0), nil
+	default:
+		return nil, err
+	}
+}
+
+// GetSessionMembers implements SessionRepository.
+func (repo *sessionRepository) GetSessionMembers(ctx context.Context, sessionId string) ([]models.SessionMember, error) {
+	var err error
+	var members []models.SessionMember
+	err = repo.source.WithContext(ctx).Where(&models.SessionMember{SessionId: sessionId}).Find(&members).Error
+	switch {
+	case err == nil:
+		return members, nil
+	case eris.Is(err, gorm.ErrRecordNotFound):
+		return make([]models.SessionMember, 0), nil
+	default:
+		return nil, err
+	}
+}
+
+// GetJoinedSessions implements SessionRepository.
+func (repo *sessionRepository) GetJoinedSessions(ctx context.Context, accountId string) ([]models.Session, error) {
+	var err error
+	var sessions []models.Session
+
+	err = repo.source.WithContext(ctx).Where("id in (?)", repo.source.
+		WithContext(ctx).
+		Select("session_id").
+		Where(&models.SessionMember{AccountId: accountId}).Table("tb_session_member")).Find(&sessions).Error
+	switch {
+	case err == nil:
+		return sessions, nil
+	case eris.Is(err, gorm.ErrRecordNotFound):
+		return make([]models.Session, 0), nil
+	default:
+		return nil, err
+	}
+}
+
+func NewSessionRepository(source *gorm.DB) SessionRepository {
+	return &sessionRepository{source: source}
+}

+ 44 - 0
repositories/session_repositroy_test.go

@@ -0,0 +1,44 @@
+package repositories
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"testing"
+
+	"sikey.com/websocket/config"
+	"sikey.com/websocket/utils/mysqlx"
+)
+
+func getSessionRepository() SessionRepository {
+	config.MustLoadConfig("../etc/websocket.toml")
+	source := mysqlx.ConnectMysql()
+	return NewSessionRepository(source)
+}
+
+func TestSessionRepository_GetJoinedSessions(t *testing.T) {
+	repo := getSessionRepository()
+	sessions, err := repo.GetJoinedSessions(context.Background(), "2d2e78e8-eb61-47c5-8ebc-8e4d7313f577")
+	if err != nil {
+		log.Fatalln(err)
+	}
+	fmt.Println(sessions)
+}
+
+func TestSessionRepository_GetSessionMembers(t *testing.T) {
+	repo := getSessionRepository()
+	members, err := repo.GetSessionMembers(context.Background(), "S_1742730237273182208")
+	if err != nil {
+		log.Fatalln(err)
+	}
+	fmt.Println(members)
+}
+
+func TestSessionRepository_GetSessionMembersRemoveOneself(t *testing.T) {
+	repo := getSessionRepository()
+	members, err := repo.GetSessionMembersRemoveOneself(context.Background(), "S_1742730237273182208", "2d2e78e8-eb61-47c5-8ebc-8e4d7313f577")
+	if err != nil {
+		log.Fatalln(err)
+	}
+	fmt.Println(members)
+}

+ 157 - 0
server/client.go

@@ -0,0 +1,157 @@
+package server
+
+import (
+	"encoding/json"
+	"time"
+
+	"github.com/gin-gonic/gin"
+	"github.com/gorilla/websocket"
+	"sikey.com/websocket/models"
+	"sikey.com/websocket/repositories"
+	"sikey.com/websocket/utils/zlog"
+)
+
+type Client struct {
+	ctx            *gin.Context
+	UserId         string
+	hub            *Hub
+	UnderlyingConn *websocket.Conn
+
+	isSimpleMsg  bool   // isSimpleMsg 是否是简单消息
+	localization string // localization 国际码
+
+	// Send message channel 发送消息
+	// 当用户在线时会通过 Send channel 发送在线消息 但如果用户不在线,
+	// 则会通过 SendOffline channel 发送离线消息
+	Send        chan *Message
+	SendOffline chan *Message // SendOffline 发送离线消息
+
+	readWait  time.Duration // readWait 读超时
+	writeWait time.Duration // writeWait 写超时
+
+	pingWait time.Duration
+	pongWait time.Duration
+
+	repos *repositories.Repositories
+}
+
+func (c *Client) reader() {
+	defer func() {
+		c.hub.Disconnect <- c
+		c.UnderlyingConn.Close()
+	}()
+
+	c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.readWait))
+
+	for {
+
+		_, bytes, err := c.UnderlyingConn.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+				zlog.Errorf("error: %v", err)
+			}
+			break
+		}
+
+		message := deserializeMessage(bytes)
+		switch message.Type {
+		case MessageTypePing:
+
+			// Reset read dead line, prevent Reader from shutting down
+			c.UnderlyingConn.SetReadDeadline(time.Now().Add(c.pongWait))
+
+			// Reply pong message
+			c.Send <- pongMessage()
+
+		case MessageTypeChating:
+
+			// Chat dialogue messages
+			chatingContent := message.Content.(ChatingContent)
+
+			// Save message to database
+			payload, _ := json.Marshal(chatingContent.Payload)
+			err := c.repos.SessionRepository.CreateMessage(c.ctx, &models.SessionMessage{
+				ID:          message.MessageId,
+				SessionId:   chatingContent.Receiver,
+				Receiver:    chatingContent.Receiver,
+				Sender:      c.UserId,
+				Type:        uint8(message.Type),
+				ContentType: uint8(chatingContent.PayloadType),
+				Content:     string(payload),
+				IsRead:      false,
+			})
+
+			if err != nil {
+				c.writeError(message.MessageId, err)
+				continue
+			}
+
+			members, err := c.repos.SessionRepository.GetSessionMembersRemoveOneself(
+				c.ctx, chatingContent.Receiver, c.UserId)
+			if err != nil {
+				c.writeError(message.MessageId, err)
+				continue
+			}
+
+			for _, member := range members {
+				var messaging = *message
+				messaging.receiver = member.AccountId
+				messaging.Content = chatingContent
+				c.hub.Message <- &messaging
+			}
+		}
+
+		// Reply message
+		if message.IsNeedReply() {
+			c.Send <- newReplyMessage(message.MessageId)
+		}
+	}
+}
+
+func (c *Client) writer() {
+	ticker := time.NewTicker(c.pingWait)
+	defer func() {
+		ticker.Stop()
+		c.UnderlyingConn.Close()
+	}()
+
+	for {
+		select {
+		case message, ok := <-c.Send:
+			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
+			if !ok {
+				// The hub closed the channel.
+				c.UnderlyingConn.WriteMessage(websocket.CloseMessage, []byte{})
+				return
+			}
+
+			var err error
+			if message.Type == MessageTypePong {
+				err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, []byte("pong"))
+				if err != nil {
+					return
+				}
+				break
+			}
+
+			err = c.UnderlyingConn.WriteMessage(websocket.TextMessage, serializationMessage(message))
+			if err != nil {
+				return
+			}
+		case <-ticker.C:
+			// 到时间发送 ping 信号
+			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))
+			if err := c.UnderlyingConn.WriteMessage(websocket.PingMessage, nil); err != nil {
+				return
+			}
+		}
+	}
+}
+
+func (c *Client) writeError(messageId string, err error) {
+	c.Send <- &Message{
+		MessageId: messageId,
+		Type:      MessageTypeError,
+		Content:   ContentError{Err: err.Error()},
+	}
+}

+ 7 - 0
server/client_test.go

@@ -0,0 +1,7 @@
+package server
+
+import "testing"
+
+func TestClient_Connection(t *testing.T) {
+	
+}

+ 12 - 0
server/conn.go

@@ -0,0 +1,12 @@
+package server
+
+import "github.com/gorilla/websocket"
+
+type Conn struct {
+	UnderlyingConn *websocket.Conn
+}
+
+func (c *Conn) Close() error {
+	return c.UnderlyingConn.Close()
+}
+

+ 60 - 0
server/hub.go

@@ -0,0 +1,60 @@
+package server
+
+import (
+	"sync"
+
+	"sikey.com/websocket/stackexchange"
+)
+
+type HubConfig struct {
+	ConnectSize    int
+	DisconnectSize int
+	MessageSize    int
+
+	StackExchange *stackexchange.StackExchange
+}
+
+type Hub struct {
+	clients map[string]*Client
+	mutex   sync.RWMutex
+
+	Connect    chan *Client
+	Disconnect chan *Client
+	Message    chan *Message
+}
+
+func NewHub(cfg HubConfig) *Hub {
+	hub := &Hub{
+		clients: make(map[string]*Client),
+		mutex:   sync.RWMutex{},
+
+		Connect:    make(chan *Client, cfg.ConnectSize),
+		Disconnect: make(chan *Client, cfg.DisconnectSize),
+		Message:    make(chan *Message, cfg.MessageSize),
+	}
+
+	go hub.run()
+	go hub.remote() // 远程消息
+	return hub
+}
+
+func (h *Hub) run() {
+	for {
+		select {
+		case client := <-h.Connect:
+			h.clients[client.UserId] = client
+		case client := <-h.Disconnect:
+			close(client.Send)
+			delete(h.clients, client.UserId)
+		case message := <-h.Message:
+			if client, ok := h.clients[message.receiver]; ok {
+				client.Send <- message
+			} else {
+				// 不在同一台服务器, 这里将消息发送至拓展应用
+			}
+		}
+	}
+}
+
+func (h *Hub) remote() {
+}

+ 115 - 0
server/message.go

@@ -0,0 +1,115 @@
+package server
+
+import (
+	"encoding/json"
+
+	"github.com/mitchellh/mapstructure"
+)
+
+type MessageType = uint8
+type ChatingContentType = uint8
+
+const (
+	MessageTypeChating MessageType = 1 + iota // MessageTypeDialogue 语聊消息
+	MessageTypeReply                          // MessageTypeReply 回复消息
+	MessageTypeError                          // MessageTypeError 错误消息,当服务器出现错误时,会发送此消息
+	MessageTypePing                           // MessageTypePing ping
+	MessageTypePong                           // MessageTypePong pong
+	MessageTypeEmpty                          //  MessageTypeEmpty 空消息
+)
+
+const (
+	ChatingContentTypeText     ChatingContentType = 1 + iota // ChatingContentTypeText 语聊文字消息
+	ChatingContentTypeMetadata                               // ChatingContentTypeMetadata 媒体文件消息
+)
+
+const (
+	PingPayload byte = 0x70
+)
+
+type Message struct {
+	Type      MessageType `json:"type"`
+	MessageId string      `json:"messageId"` // MessageId 当前消息的Id
+	Content   any         `json:"content"`   // Content 内容
+
+	receiver string `json:"-"` // Receiver 消息接受者
+}
+
+// ChatingContent to client dialogue message
+type ChatingContent struct {
+	Receiver    string             `json:"receiver"`
+	PayloadType ChatingContentType `json:"payloadType"`
+	Payload     any                `json:"payload"`
+}
+
+// ContentText received a text message from the client
+type ContentText struct {
+	Raw string `json:"raw"`
+}
+
+// ContentMetadata received media file message from client
+type ContentMetadata struct {
+	Url      string `json:"url"`      // Url 文件地址
+	FileType string `json:"fileType"` // FileType 文件类型
+	Duration uint   `json:"duration"` // Duration 视频/语音时长
+}
+
+// COntentReply to client after receiving the message
+type ContentReply struct {
+	MessageId string `json:"messageId"` // MessageId 消息ID
+	ReplyId   string `json:"replyId"`   // ReplyId 回复ID
+}
+
+type ContentError struct {
+	Err string `json:"err"`
+}
+
+func (m *Message) IsNeedReply() bool {
+	return m.MessageId != ""
+}
+
+func deserializeMessage(bytes []byte) *Message {
+	if len(bytes) == 0 {
+		return emptyMessage()
+	}
+
+	if bytes[0] == PingPayload {
+		return pingMessage()
+	}
+
+	var message Message
+	_ = json.Unmarshal(bytes, &message)
+
+	switch message.Type {
+
+	// Is it a chating message
+	case MessageTypeChating:
+		var chatingContent ChatingContent
+		mapstructure.Decode(message.Content, &chatingContent)
+		message.Content = chatingContent
+	}
+
+	return &message
+}
+
+func serializationMessage(message *Message) []byte {
+	bytes, _ := json.Marshal(message)
+	return bytes
+}
+
+// newReplyMessage new a to client reply message
+func newReplyMessage(messageId string) *Message {
+	return &Message{MessageId: messageId, Type: MessageTypeReply}
+}
+
+func pingMessage() *Message {
+	return &Message{Type: MessageTypePing}
+}
+
+func pongMessage() *Message {
+	return &Message{Type: MessageTypePong}
+}
+
+func emptyMessage() *Message {
+	return &Message{Type: MessageTypeEmpty}
+}

+ 109 - 0
server/server.go

@@ -0,0 +1,109 @@
+package server
+
+import (
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/gin-gonic/gin"
+	"github.com/golang-jwt/jwt/v5"
+	"github.com/gorilla/websocket"
+	"github.com/rotisserie/eris"
+	"github.com/spf13/viper"
+	"sikey.com/websocket/repositories"
+	"sikey.com/websocket/utils/keys"
+	"sikey.com/websocket/utils/zlog"
+)
+
+type Server struct {
+	Ctx          *gin.Context
+	Repositories *repositories.Repositories
+
+	Upgrader websocket.Upgrader
+	Hub      *Hub
+
+	ReadWait  time.Duration
+	WriteWait time.Duration
+	PingWait  time.Duration
+	PongWait  time.Duration
+}
+
+func WebsocketHandler(ctx *gin.Context, srv *Server) {
+	srv.Ctx = ctx
+
+	conn, err := srv.Upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
+	if err != nil {
+		ctx.AbortWithError(http.StatusInternalServerError, err)
+		return
+	}
+
+	// Builder headers
+	headers := headerBuilder(ctx)
+
+	// Validate token
+	id, ok, err := jwtParse(headers)
+	if !ok {
+		ctx.AbortWithError(http.StatusUnauthorized, err)
+		return
+	}
+
+	// Create client
+	client := &Client{
+		ctx:            ctx.Copy(),
+		UserId:         id,
+		hub:            srv.Hub,
+		UnderlyingConn: conn,
+		Send:           make(chan *Message, 256),
+		writeWait:      srv.WriteWait,
+		readWait:       srv.ReadWait,
+		pingWait:       srv.PingWait,
+		pongWait:       srv.PongWait,
+
+		isSimpleMsg:  headers[keys.SimpleHeader].(bool),
+		localization: headers[keys.LocalizationHeader].(string),
+
+		repos: srv.Repositories,
+	}
+	client.hub.Connect <- client
+
+	go client.reader()
+	go client.writer()
+}
+
+type Headers = map[string]interface{}
+
+func headerBuilder(ctx *gin.Context) Headers {
+	headers := make(Headers)
+	request := ctx.Request
+	accessToken := request.URL.Query().Get(keys.AccessTokenHeader)
+	simple := request.URL.Query().Get(keys.SimpleHeader)
+	localization := request.URL.Query().Get(keys.LocalizationHeader)
+	headers[keys.AccessTokenHeader] = accessToken
+	headers[keys.SimpleHeader] = simple == "1"
+	headers[keys.LocalizationHeader] = localization
+	return headers
+}
+
+func jwtParse(headers Headers) (string, bool, error) {
+	accessToken := headers[keys.AccessTokenHeader].(string)
+	if len(accessToken) == 0 {
+		return "", false, eris.New("token is empty")
+	}
+	accessToken = strings.Trim(accessToken, " ")
+
+	token, err := jwt.Parse(accessToken, func(token *jwt.Token) (interface{}, error) {
+		return []byte(viper.GetString("auth.secret")), nil
+	})
+	if err != nil {
+		zlog.Error(err.Error())
+		return "", false, eris.Wrap(err, "token parse error")
+	}
+
+	mapClaims := token.Claims.(jwt.MapClaims)
+	// exp := mapClaims["exp"].(float64)
+	// if exp != 0 && exp < float64(time.Now().Unix()) {
+	// 	return "", false, eris.New("token is expired")
+	// }
+	uid := mapClaims["Uid"].(string)
+	return uid, true, nil
+}

+ 61 - 0
stackexchange/stackexchange.go

@@ -0,0 +1,61 @@
+package stackexchange
+
+import (
+	"context"
+	"time"
+
+	"github.com/segmentio/kafka-go"
+	"sikey.com/websocket/config"
+)
+
+type StackExchange struct {
+	Brokers   []string
+	Topic     string
+	Partition int
+	MaxBytes  int
+
+	reader *kafka.Reader
+	writer *kafka.Writer
+}
+
+func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
+	// dialer
+	dialer := &kafka.Dialer{
+		Timeout:   config.Kafka.Timeout * time.Second,
+		DualStack: true,
+	}
+
+	exc := &StackExchange{
+		Brokers:   brokers,
+		Topic:     topic,
+		Partition: partition,
+		MaxBytes:  maxBytes,
+		reader: kafka.NewReader(kafka.ReaderConfig{
+			Brokers: brokers,
+			Topic:   topic,
+			Dialer:  dialer,
+		}),
+		writer: kafka.NewWriter(kafka.WriterConfig{
+			Brokers:  brokers,
+			Topic:    topic,
+			Balancer: &kafka.Hash{},
+			Dialer:   dialer,
+		}),
+	}
+	return exc
+}
+
+func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
+	return exc.reader.ReadMessage(context.Background())
+}
+
+// func (exc *StackExchange) run() {
+// 	for {
+// 		message, err := exc.reader.ReadMessage(context.Background())
+// 		if err != nil {
+// 			zlog.Error(err)
+// 		}
+
+// 		fmt.Println(message)
+// 	}
+// }

+ 30 - 0
utils/configx/configx.go

@@ -0,0 +1,30 @@
+package configx
+
+import (
+	"github.com/rotisserie/eris"
+	"github.com/spf13/viper"
+)
+
+type Config struct {
+	Name        string `toml:"name"`
+	Port        int    `toml:"port"`
+	Environment string `toml:"environment"`
+}
+
+type LoadOption func() error
+
+func LoadConfig(file string, opts ...LoadOption) error {
+	viper.SetConfigFile(file)
+	viper.SetConfigType("toml")
+	if err := viper.ReadInConfig(); err != nil {
+		panic(eris.Wrap(err, "无法加载配置"))
+	}
+
+	var err error
+	for _, opt := range opts {
+		if err = opt(); err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 15 - 0
utils/format/date.go

@@ -0,0 +1,15 @@
+package format
+
+const (
+	WordStartDate = "1983-01-01"
+)
+
+const (
+	DateParseAllFormat                  = "2006-01-02 15:04:05"
+	DateParseAllUnixMilliFormat         = "2006-01-02 15:04:05.000000"
+	UnsignedDateParseAllUnixMilliFormat = "20060102150405.000000"
+	DateParseLeftFormat                 = "2006-01-02"
+	DateParseRightFormat                = "15:04:05"
+
+	DateParseRightMinuteFormat = "15:04"
+)

+ 7 - 0
utils/format/mac.go

@@ -0,0 +1,7 @@
+package format
+
+import "strings"
+
+func SubMac(mac string) string {
+	return strings.Replace(mac, ":", "", -1)
+}

+ 12 - 0
utils/format/number.go

@@ -0,0 +1,12 @@
+package format
+
+import (
+	"fmt"
+	"strconv"
+)
+
+// ToFixedWithTwoDigits 保留两位小数
+func ToFixedWithTwoDigits(value float64) float64 {
+	value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)
+	return value
+}

+ 7 - 0
utils/keys/websocket.go

@@ -0,0 +1,7 @@
+package keys
+
+var (
+	AccessTokenHeader  = "X-Websocket-Header-X-Token"      // AccessTokenHeader 登录 Access Token
+	SimpleHeader       = "X-Websocket-Header-Simple"       // SimpleHeader 简单消息,bool类型,0:false 1:true
+	LocalizationHeader = "X-Websocket-Header-Localization" // LocalizationHeader 本地化,更多国际码查看附页
+)

+ 73 - 0
utils/mysqlx/mysql.go

@@ -0,0 +1,73 @@
+package mysqlx
+
+import (
+	"log"
+	"os"
+	"time"
+
+	"github.com/spf13/viper"
+	"gorm.io/driver/mysql"
+	"gorm.io/gorm"
+	"gorm.io/gorm/logger"
+)
+
+type mysqlConfig struct {
+	Dsn                       string        // Dsn 数据源地址
+	SkipDefaultTransaction    bool          // SkipDefaultTransaction 跳过默认事务
+	SlowThreshold             time.Duration // SlowThreshold 慢 SQL 阈值
+	IgnoreRecordNotFoundError bool          // IgnoreRecordNotFoundError 忽略记录未找到的错误
+	MaxLifetime               time.Duration // MaxLifetime 连接的有效时长
+	MaxOpenConns              int           // MaxOpenConns 打开数据库连接的最大数量。
+	MaxIdleConns              int           // MaxIdleConns 空闲连接池中连接的最大数量
+}
+
+// ConnectMysql 初始化 mysql 连接
+func ConnectMysql() *gorm.DB {
+	cfg := readMysqlConfig()
+	conn, err := gorm.Open(mysql.New(mysql.Config{
+		DSN:                       cfg.Dsn,
+		DefaultStringSize:         255,
+		SkipInitializeWithVersion: false,
+	}), getGormConfig(cfg))
+	if err != nil {
+		log.Panicln(err)
+	}
+
+	sqlDB, _ := conn.DB()
+	// SetMaxIdleConns 设置空闲连接池中连接的最大数量
+	sqlDB.SetMaxIdleConns(cfg.MaxIdleConns)
+	// SetMaxOpenConns 设置打开数据库连接的最大数量。
+	sqlDB.SetMaxOpenConns(cfg.MaxOpenConns)
+	// SetConnMaxLifetime 设置了连接可复用的最大时间。
+	sqlDB.SetConnMaxLifetime(cfg.MaxLifetime)
+
+	log.Printf("Mysql connected to %s \n", cfg.Dsn)
+	return conn
+}
+
+// readMysqlConfig 加载配置
+func readMysqlConfig() mysqlConfig {
+	var cfg mysqlConfig
+	if err := viper.UnmarshalKey("mysql", &cfg); err != nil {
+		log.Fatalln(err)
+	}
+	return cfg
+}
+
+// getGormConfig 获取 gorm 配置
+func getGormConfig(cfg mysqlConfig) *gorm.Config {
+	return &gorm.Config{
+		DisableForeignKeyConstraintWhenMigrating: true,
+		SkipDefaultTransaction:                   cfg.SkipDefaultTransaction,
+		Logger:                                   defaultLogger(cfg),
+	}
+}
+
+// defaultLogger 默认的日志打印
+func defaultLogger(cfg mysqlConfig) logger.Interface {
+	return logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{
+		SlowThreshold:             cfg.SlowThreshold * time.Millisecond, // Slow SQL threshold
+		LogLevel:                  logger.Silent,                        // Log level
+		IgnoreRecordNotFoundError: cfg.IgnoreRecordNotFoundError,        // Ignore ErrRecordNotFound error for logger
+	})
+}

+ 48 - 0
utils/redisx/redis.go

@@ -0,0 +1,48 @@
+package redisx
+
+import (
+	"context"
+	"log"
+
+	"github.com/redis/go-redis/v9"
+	"github.com/spf13/viper"
+)
+
+type redisConfig struct {
+	Addr     string // Addr 链接地址
+	DB       int    // DB 数据库, 一般默认是0
+	Password string // Password 密码
+
+	// Channel 来控制订阅频道, 连接发送消息会发送到频道
+	Channel string // Channel 订阅频道
+	// ConnectKey 连接 key, websocket connect 创建时会在 redis 里保存信息,
+	// 这里的 ConnectKey 相当于前缀保存在 redis 里的 key
+	ConnectKey string
+}
+
+func RedisConnect() *redis.Client {
+	cfg := readRedisConfig()
+	addr := cfg.Addr
+	rdb := redis.NewClient(&redis.Options{
+		Addr:     addr,
+		Password: cfg.Password,
+		DB:       cfg.DB,
+	})
+	if err := rdb.Ping(context.Background()).Err(); err != nil {
+		log.Fatalln(err)
+	}
+	log.Printf("Redis connected %s \n", addr)
+	return rdb
+}
+
+func GetRedisConfig() redisConfig {
+	return readRedisConfig()
+}
+
+func readRedisConfig() redisConfig {
+	var cfg redisConfig
+	if err := viper.UnmarshalKey("redis", &cfg); err != nil {
+		log.Fatalln(err)
+	}
+	return cfg
+}

+ 67 - 0
utils/zlog/write.go

@@ -0,0 +1,67 @@
+package zlog
+
+import (
+	"fmt"
+	"io"
+	"log"
+	"os"
+	"time"
+
+	"sikey.com/websocket/utils/format"
+)
+
+const (
+	// 控制环境不写入到 es 数据库里
+	notPushEnvironment = "env"
+)
+
+type Writer interface {
+	io.Writer
+
+	GetEnv() string
+}
+
+type ZeroWriter struct {
+	name string
+	path string
+
+	env string
+}
+
+func NewZeroWriter(name, path, env string) (*ZeroWriter, error) {
+	return &ZeroWriter{name: name, path: path, env: env}, nil
+}
+
+func (w *ZeroWriter) GetEnv() string {
+	return w.env
+}
+
+func (w *ZeroWriter) Write(p []byte) (int, error) {
+
+	// date
+	date := time.Now().Format(format.DateParseLeftFormat)
+	filename := fmt.Sprintf("%s/%s-%s.log", w.path, w.name, date)
+
+	var err error
+	var file *os.File
+	if fileStat(filename) {
+		file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND, os.ModeAppend)
+	} else {
+		file, err = os.Create(filename)
+	}
+	if err != nil {
+		log.Println("open file error: ", err)
+	}
+
+	return file.Write(p)
+}
+
+// fileStat 检查文件是否存在
+func fileStat(path string) bool {
+	_, err := os.Stat(path)
+	if err == nil {
+		return true
+	}
+
+	return os.IsExist(err)
+}

+ 149 - 0
utils/zlog/zlog.go

@@ -0,0 +1,149 @@
+package zlog
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"sync"
+
+	"github.com/rotisserie/eris"
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+)
+
+var (
+	l    *Logger
+	once = sync.Once{}
+)
+
+type Logger struct {
+	log *zap.Logger
+	ctx context.Context
+}
+
+func NewLogger(writer Writer) *Logger {
+	// 限制日志输出级别, >= DebugLevel 会打印所有级别的日志
+	// 生产环境中一般使用 >= ErrorLevel
+	lowPriority := zap.LevelEnablerFunc(func(lv zapcore.Level) bool {
+		return lv >= zapcore.DebugLevel
+	})
+
+	// 控制台展示方便调试,使用 TEXT 的方式
+	consoleEncoderConfig := zap.NewDevelopmentEncoderConfig()
+	consoleEncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
+	consoleEncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
+	consoleEncoder := zapcore.NewConsoleEncoder(consoleEncoderConfig)
+	stdCore := zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), lowPriority)
+
+	// 日志格式化
+	productionEncoderConfig := zap.NewProductionEncoderConfig()
+	productionEncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
+	jsonEnc := zapcore.NewJSONEncoder(productionEncoderConfig)
+
+	// 使用 JSON 格式日志
+	var core zapcore.Core
+	if writer.GetEnv() != notPushEnvironment {
+		syncer := zapcore.AddSync(writer)
+		esCore := zapcore.NewCore(jsonEnc, syncer, lowPriority).With([]zap.Field{zap.String("env", writer.GetEnv())})
+		core = zapcore.NewTee(stdCore, esCore)
+	} else {
+		core = stdCore
+	}
+
+	// logger 输出到 console 且标识调用代码行
+	l := zap.New(core).WithOptions(zap.AddCallerSkip(1), zap.AddCaller())
+	return &Logger{log: l}
+}
+
+func NewLoggerWithZapCode(l *zap.Logger) *Logger {
+	return &Logger{log: l}
+}
+
+func WithZapLogger(log *Logger) {
+	once.Do(func() {
+		l = log
+	})
+}
+
+// Debug logs a message at info level.
+func Debug(args ...any) {
+	l.log.Debug(fmt.Sprint(args...))
+}
+
+// Debugf logs a message at info level.
+func Debugf(msg string, args ...any) {
+	l.log.Debug(fmt.Sprintf(msg, args...))
+}
+
+// Debugv logs a message at info level.
+func Debugv(arg any) {
+	l.log.Debug(fmt.Sprint(arg))
+}
+
+// Debugw logs a message at info level.
+func Debugw(msg string, fields ...zap.Field) {
+	l.log.Debug(fmt.Sprintf(msg, toValues(fields)...))
+}
+
+// Error logs a message at error level.
+func Error(args ...any) {
+	l.log.Error(fmt.Sprint(args...))
+}
+
+func ErrorStack(err error) {
+	format := eris.NewDefaultStringFormat(eris.FormatOptions{
+		InvertOutput: true,
+		WithTrace:    true,
+		InvertTrace:  true,
+	})
+
+	formattedStr := eris.ToCustomString(err, format)
+	l.log.Error(formattedStr)
+}
+
+// Errorf logs a message at error level.
+func Errorf(msg string, args ...any) {
+	l.log.Error(fmt.Sprintf(msg, args...))
+}
+
+// Errorv logs a message at error level.
+func Errorv(arg any) {
+	l.log.Error(fmt.Sprint(arg))
+}
+
+// Errorw logs a message at error level.
+func Errorw(msg string, fields ...zap.Field) {
+	l.log.Error(fmt.Sprintf(msg, toValues(fields)...))
+}
+
+// Info logs a message at info level.
+func Info(args ...any) {
+	l.log.Info(fmt.Sprint(args...))
+}
+
+// Infof logs a message at info level.
+func Infof(msg string, args ...any) {
+	l.log.Info(fmt.Sprintf(msg, args...))
+}
+
+// Infov logs a message at info level.
+func Infov(arg any) {
+	l.log.Info(fmt.Sprint(arg))
+}
+
+// Infow logs a message at info level.
+func Infow(msg string, fields ...zap.Field) {
+	l.log.Info(fmt.Sprintf(msg, toValues(fields)...))
+}
+
+func addLineBreakSymbol(msg string) string {
+	return fmt.Sprintln(msg)
+}
+
+func toValues(fields []zap.Field) []any {
+	var str []any = make([]any, len(fields))
+	for i, field := range fields {
+		str[i] = field.String
+	}
+	return str
+}

+ 74 - 0
utils/zlog/zlog_grpc.go

@@ -0,0 +1,74 @@
+package zlog
+
+type GrpcLogger struct {
+	logger *Logger
+}
+
+func NewGrpcLogger(logger *Logger) *GrpcLogger {
+	return &GrpcLogger{logger: logger}
+}
+
+// Info returns
+func (zl *GrpcLogger) Info(args ...interface{}) {
+	Info(args...)
+}
+
+// Infoln returns
+func (zl *GrpcLogger) Infoln(args ...interface{}) {
+	Info(args...)
+}
+
+// Infof returns
+func (zl *GrpcLogger) Infof(format string, args ...interface{}) {
+	Infof(format, args...)
+}
+
+// Warning returns
+func (zl *GrpcLogger) Warning(args ...interface{}) {
+	Debug(args...)
+}
+
+// Warningln returns
+func (zl *GrpcLogger) Warningln(args ...interface{}) {
+	Debug(args...)
+}
+
+// Warningf returns
+func (zl *GrpcLogger) Warningf(format string, args ...interface{}) {
+	Debugf(format, args...)
+}
+
+// Error returns
+func (zl *GrpcLogger) Error(args ...interface{}) {
+	Error(args...)
+}
+
+// Errorln returns
+func (zl *GrpcLogger) Errorln(args ...interface{}) {
+	Error(args...)
+}
+
+// Errorf returns
+func (zl *GrpcLogger) Errorf(format string, args ...interface{}) {
+	Errorf(format, args...)
+}
+
+// Fatal returns
+func (zl *GrpcLogger) Fatal(args ...interface{}) {
+	Error(args...)
+}
+
+// Fatalln returns
+func (zl *GrpcLogger) Fatalln(args ...interface{}) {
+	Error(args...)
+}
+
+// Fatalf logs to fatal level
+func (zl *GrpcLogger) Fatalf(format string, args ...interface{}) {
+	Errorf(format, args...)
+}
+
+// V reports whether verbosity level l is at least the requested verbose level.
+func (zl *GrpcLogger) V(v int) bool {
+	return false
+}

+ 67 - 0
websocket.go

@@ -0,0 +1,67 @@
+package main
+
+import (
+	"flag"
+	"net/http"
+	"time"
+
+	"github.com/gin-gonic/gin"
+	"github.com/gorilla/websocket"
+	"sikey.com/websocket/config"
+	"sikey.com/websocket/repositories"
+	"sikey.com/websocket/server"
+	"sikey.com/websocket/stackexchange"
+	"sikey.com/websocket/utils/mysqlx"
+	"sikey.com/websocket/utils/zlog"
+)
+
+var configFile = flag.String("f", "./etc/websocket.toml", "the config file")
+
+func main() {
+	flag.Parse()
+	config.MustLoadConfig(*configFile)
+
+	// Zaplog init
+	zlog.WithZapLogger(zlog.NewLogger(config.MustLoadLogger()))
+
+	app := newApp()
+	app.Run()
+}
+
+func newApp() *gin.Engine {
+	app := gin.Default()
+
+	// 创建 Websocket 横向拓展应用
+	exc := stackexchange.NewStackExchange(
+		config.Kafka.Brokers,
+		config.Kafka.Topic,
+		config.Kafka.Partition,
+		config.Kafka.MaxBytes,
+	)
+
+	srv := &server.Server{
+		Upgrader: websocket.Upgrader{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			CheckOrigin: func(r *http.Request) bool {
+				return true
+			},
+		},
+		WriteWait: 10 * time.Second,
+		ReadWait:  10 * time.Second,
+		PongWait:  20 * time.Second,
+		// 写 ping 帧周期(必须小于 pongWait), 54s
+		// PingWait:  (600 * time.Second) * 9 / 10,
+		// (pongWait * 9) / 10
+		PingWait: 13 * time.Second,
+		Hub: server.NewHub(server.HubConfig{
+			ConnectSize:    1024,
+			DisconnectSize: 1024,
+			MessageSize:    125,
+			StackExchange:  exc,
+		}),
+		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql()),
+	}
+	app.GET("/websocket/endpoint", func(ctx *gin.Context) { server.WebsocketHandler(ctx, srv) })
+	return app
+}