luoyangwei hai 1 ano
pai
achega
16d4e0ae53

+ 0 - 3
go.mod

@@ -10,7 +10,6 @@ require (
 	github.com/mitchellh/mapstructure v1.5.0
 	github.com/redis/go-redis/v9 v9.4.0
 	github.com/rotisserie/eris v0.5.4
-	github.com/segmentio/kafka-go v0.4.47
 	github.com/spf13/viper v1.18.2
 	go.uber.org/zap v1.26.0
 	gorm.io/driver/mysql v1.5.2
@@ -35,7 +34,6 @@ require (
 	github.com/jinzhu/inflection v1.0.0 // indirect
 	github.com/jinzhu/now v1.1.5 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
-	github.com/klauspost/compress v1.17.0 // indirect
 	github.com/klauspost/cpuid/v2 v2.2.6 // indirect
 	github.com/leodido/go-urn v1.2.4 // indirect
 	github.com/magiconair/properties v1.8.7 // indirect
@@ -43,7 +41,6 @@ require (
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
 	github.com/pelletier/go-toml/v2 v2.1.1 // indirect
-	github.com/pierrec/lz4/v4 v4.1.15 // indirect
 	github.com/sagikazarmark/locafero v0.4.0 // indirect
 	github.com/sagikazarmark/slog-shim v0.1.0 // indirect
 	github.com/sourcegraph/conc v0.3.0 // indirect

+ 0 - 52
go.sum

@@ -60,9 +60,6 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
-github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
-github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
-github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
 github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@@ -86,8 +83,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
 github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
 github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
-github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
-github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -101,8 +96,6 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
 github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
 github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
 github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
-github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
-github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
 github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
 github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
 github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
@@ -130,13 +123,6 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
 github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
 github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
 github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
-github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
-github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
-github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
-github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
-github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
-github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
-github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
 go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
 go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
@@ -146,56 +132,18 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
 golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
 golang.org/x/arch v0.7.0 h1:pskyeJh/3AmoQ8CPE95vxHLqp1G1GfGNXTmcl9NEKTc=
 golang.org/x/arch v0.7.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
-golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
 golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
 golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
 golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
-golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
-golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
-golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
-golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
 golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
 golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
-golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
-golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
-golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
-golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
-golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
-golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
 golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
-golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
-golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
-golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

+ 18 - 20
server/client.go

@@ -7,10 +7,8 @@ import (
 	"github.com/gin-gonic/gin"
 	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
-	"github.com/segmentio/kafka-go"
 	"sikey.com/websocket/models"
 	"sikey.com/websocket/repositories"
-	"sikey.com/websocket/stackexchange"
 	"sikey.com/websocket/utils/zlog"
 )
 
@@ -51,9 +49,9 @@ func (c *Client) reader() {
 		if err != nil {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
 				zlog.Errorf("error: %v", err)
-				if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
-					zlog.Error(err)
-				}
+				// if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
+				// 	zlog.Error(err)
+				// }
 			}
 			break
 		}
@@ -119,9 +117,9 @@ func (c *Client) writer() {
 	defer func() {
 		ticker.Stop()
 		c.UnderlyingConn.Close()
-		if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
-			zlog.Error(err)
-		}
+		// if err := c.hub.exchange.OfflineNotify(c.UserId); err != nil {
+		// 	zlog.Error(err)
+		// }
 	}()
 
 	for {
@@ -140,18 +138,18 @@ func (c *Client) writer() {
 				return
 			}
 
-		case message, ok := <-c.RemotelyMessage:
-			// Sending remotely message to client
-			if !ok {
-			}
-
-			err := c.hub.exchange.SendMessage(c.ctx, kafka.Message{
-				Key:   []byte(stackexchange.StackExchangeMessaging),
-				Value: serializationMessage(message),
-			})
-			if err != nil {
-				return
-			}
+		// case message, ok := <-c.RemotelyMessage:
+		// 	// Sending remotely message to client
+		// 	if !ok {
+		// 	}
+
+		// err := c.hub.exchange.SendMessage(c.ctx, kafka.Message{
+		// 	Key:   []byte(stackexchange.StackExchangeMessaging),
+		// 	Value: serializationMessage(message),
+		// })
+		// if err != nil {
+		// 	return
+		// }
 		case <-ticker.C:
 			// 到时间发送 ping 信号
 			c.UnderlyingConn.SetWriteDeadline(time.Now().Add(c.writeWait))

+ 27 - 28
server/hub.go

@@ -1,10 +1,9 @@
 package server
 
 import (
-	"bytes"
 	"sync"
 
-	"sikey.com/websocket/stackexchange"
+	"sikey.com/websocket/stackexchange/redis"
 	"sikey.com/websocket/utils/zlog"
 )
 
@@ -13,7 +12,7 @@ type HubConfig struct {
 	DisconnectSize int
 	MessageSize    int
 
-	StackExchange *stackexchange.StackExchange
+	// StackExchange *stackexchange.RedisStackExchange
 }
 
 type Hub struct {
@@ -27,7 +26,7 @@ type Hub struct {
 
 	Message chan *Message
 
-	exchange *stackexchange.StackExchange
+	StackExchange *redis.StackExchange
 }
 
 func NewHub(cfg HubConfig) *Hub {
@@ -40,10 +39,10 @@ func NewHub(cfg HubConfig) *Hub {
 		Connect:    make(chan *Client, cfg.ConnectSize),
 		Disconnect: make(chan *Client, cfg.DisconnectSize),
 		Message:    make(chan *Message, cfg.MessageSize),
-
-		exchange: cfg.StackExchange,
 	}
 
+	hub.StackExchange = redis.NewStackExchange()
+
 	go hub.run()
 	go hub.remotely() // 远程消息
 	return hub
@@ -82,27 +81,27 @@ func (h *Hub) remotely() {
 	for {
 
 		zlog.Info("Remotely client actions monitoring.")
-		km, err := h.exchange.ReadMessage()
-		if err != nil {
-			zlog.Error(err)
-		}
-
-		// Create new a remote client
-		remotelyClient := &Client{
-			isRemotely: true, // mark remotely client
-			UserId:     string(km.Value),
-			Send:       make(chan *Message, 256),
-			hub:        h,
-		}
-		switch {
-		case bytes.Equal(km.Key, stackexchange.StackExchangeOnline):
-			h.Connect <- remotelyClient
-			zlog.Info("Remotely client online: ", remotelyClient.UserId)
-		case bytes.Equal(km.Key, stackexchange.StackExchangeOffline):
-			h.Disconnect <- remotelyClient
-			zlog.Info("Remotely client offline: ", remotelyClient.UserId)
-		case bytes.Equal(km.Key, stackexchange.StackExchangeMessaging):
-			h.Message <- deserializeMessage(km.Value)
-		}
+		// km, err := h.exchange.ReadMessage()
+		// if err != nil {
+		// 	zlog.Error(err)
+		// }
+
+		// // Create new a remote client
+		// remotelyClient := &Client{
+		// 	isRemotely: true, // mark remotely client
+		// 	UserId:     string(km.Value),
+		// 	Send:       make(chan *Message, 256),
+		// 	hub:        h,
+		// }
+		// switch {
+		// case bytes.Equal(km.Key, stackexchange.StackExchangeOnline):
+		// 	h.Connect <- remotelyClient
+		// 	zlog.Info("Remotely client online: ", remotelyClient.UserId)
+		// case bytes.Equal(km.Key, stackexchange.StackExchangeOffline):
+		// 	h.Disconnect <- remotelyClient
+		// 	zlog.Info("Remotely client offline: ", remotelyClient.UserId)
+		// case bytes.Equal(km.Key, stackexchange.StackExchangeMessaging):
+		// 	h.Message <- deserializeMessage(km.Value)
+		// }
 	}
 }

+ 72 - 83
server/hub_test.go

@@ -1,98 +1,87 @@
 package server
 
-import (
-	"context"
-	"fmt"
-	"log"
-	"testing"
-	"time"
-
-	"github.com/segmentio/kafka-go"
-	"sikey.com/websocket/stackexchange"
-)
-
 var userId = "d6faa0af-b863-48bb-b658-d961a9381585"
 
-func TestHub_ConnectMessage(t *testing.T) {
-	writer := &kafka.Writer{
-		Addr:                   kafka.TCP("106.75.230.4:9092"),
-		Topic:                  "messaging-channel",
-		AllowAutoTopicCreation: true,
-	}
+// func TestHub_ConnectMessage(t *testing.T) {
+// 	writer := &kafka.Writer{
+// 		Addr:                   kafka.TCP("106.75.230.4:9092"),
+// 		Topic:                  "messaging-channel",
+// 		AllowAutoTopicCreation: true,
+// 	}
 
-	// 发送上线消息
-	message := kafka.Message{
-		Key:   stackexchange.StackExchangeOnline,
-		Value: []byte(userId),
-	}
+// 	// 发送上线消息
+// 	message := kafka.Message{
+// 		Key:   stackexchange.StackExchangeOnline,
+// 		Value: []byte(userId),
+// 	}
 
-	if err := writer.WriteMessages(context.TODO(), message); err != nil {
-		log.Fatalln(err)
-	}
-}
+// 	if err := writer.WriteMessages(context.TODO(), message); err != nil {
+// 		log.Fatalln(err)
+// 	}
+// }
 
-func TestHub_DisconnectMessage(t *testing.T) {
-	writer := &kafka.Writer{
-		Addr:                   kafka.TCP("106.75.230.4:9092"),
-		Topic:                  "messaging-channel",
-		AllowAutoTopicCreation: true,
-	}
+// func TestHub_DisconnectMessage(t *testing.T) {
+// 	writer := &kafka.Writer{
+// 		Addr:                   kafka.TCP("106.75.230.4:9092"),
+// 		Topic:                  "messaging-channel",
+// 		AllowAutoTopicCreation: true,
+// 	}
 
-	// 发送上线消息
-	message := kafka.Message{
-		Key:   stackexchange.StackExchangeOffline,
-		Value: []byte(userId),
-	}
+// 	// 发送上线消息
+// 	message := kafka.Message{
+// 		Key:   stackexchange.StackExchangeOffline,
+// 		Value: []byte(userId),
+// 	}
 
-	if err := writer.WriteMessages(context.TODO(), message); err != nil {
-		log.Fatalln(err)
-	}
-}
+// 	if err := writer.WriteMessages(context.TODO(), message); err != nil {
+// 		log.Fatalln(err)
+// 	}
+// }
 
-func TestHub_ReaderMessage(t *testing.T) {
-	reader := kafka.NewReader(kafka.ReaderConfig{
-		Brokers:     []string{"106.75.230.4:9092"},
-		Topic:       "messaging-channel",
-		StartOffset: kafka.FirstOffset,
-	})
+// func TestHub_ReaderMessage(t *testing.T) {
+// 	reader := kafka.NewReader(kafka.ReaderConfig{
+// 		Brokers:     []string{"106.75.230.4:9092"},
+// 		Topic:       "messaging-channel",
+// 		StartOffset: kafka.FirstOffset,
+// 	})
 
-	ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
-	defer cannel()
+// 	ctx, cannel := context.WithTimeout(context.Background(), 10*time.Second)
+// 	defer cannel()
 
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			var err error
-			var msg kafka.Message
-			if msg, err = reader.ReadMessage(ctx); err != nil {
-				fmt.Println(err)
-				break
-			}
-			fmt.Println(msg)
-		}
-	}
-}
+// 	for {
+// 		select {
+// 		case <-ctx.Done():
+// 			return
+// 		default:
+// 			var err error
+// 			var msg kafka.Message
+// 			if msg, err = reader.ReadMessage(ctx); err != nil {
+// 				fmt.Println(err)
+// 				break
+// 			}
+// 			fmt.Println(msg)
+// 		}
+// 	}
+// }
 
-func TestHub_TopicList(t *testing.T) {
-	conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
-	if err != nil {
-		panic(err.Error())
-	}
-	defer conn.Close()
+// func TestHub_TopicList(t *testing.T) {
+// 	conn, err := kafka.Dial("tcp", "106.75.230.4:9092")
+// 	if err != nil {
+// 		panic(err.Error())
+// 	}
+// 	defer conn.Close()
 
-	partitions, err := conn.ReadPartitions()
-	if err != nil {
-		panic(err.Error())
-	}
+// 	partitions, err := conn.ReadPartitions()
+// 	if err != nil {
+// 		panic(err.Error())
+// 	}
 
-	m := map[string]struct{}{}
-	// 遍历所有分区取topic
-	for _, p := range partitions {
-		m[p.Topic] = struct{}{}
-	}
-	for k := range m {
-		fmt.Println(k)
-	}
-}
+// 	m := map[string]struct{}{}
+// 	// 遍历所有分区取topic
+// 	for _, p := range partitions {
+// 		m[p.Topic] = struct{}{}
+// 	}
+// 	for k := range m {
+// 		fmt.Println(k)
+// 	}
+// }

+ 1 - 1
server/server.go

@@ -64,7 +64,7 @@ func WebsocketHandler(ctx *gin.Context, srv *Server) {
 		repos: srv.Repositories,
 	}
 	client.hub.Connect <- client
-	client.hub.exchange.OnlineNotify(client.UserId)
+	// client.hub.exchange.OnlineNotify(client.UserId)
 	zlog.Info("client online: ", client.UserId)
 
 	go client.reader()

+ 10 - 0
stackexchange/redis/redis_stackexchange.go

@@ -0,0 +1,10 @@
+package redis
+
+type StackExchange struct {
+	channel string
+}
+
+func NewStackExchange() *StackExchange {
+	exc := &StackExchange{}
+	return exc
+}

+ 0 - 6
stackexchange/redis_stackexchange.go

@@ -1,6 +0,0 @@
-package stackexchange
-
-type RedisStackExchange struct {
-
-	
-}

+ 90 - 98
stackexchange/stackexchange.go

@@ -1,100 +1,92 @@
 package stackexchange
 
-import (
-	"context"
-	"time"
-
-	"github.com/segmentio/kafka-go"
-	"sikey.com/websocket/config"
-)
-
-var (
-	StackExchangeOnline    = []byte("exchange_online")    // StackExchangeOnline 上线
-	StackExchangeOffline   = []byte("exchange_offline")   // StackExchangeOffline 下线
-	StackExchangeMessaging = []byte("exchange_messaging") // StackExchangeMessaging 消息
-)
-
-type StackExchange struct {
-	Brokers   []string
-	Topic     string
-	Partition int
-	MaxBytes  int
-
-	offset int64
-	reader *kafka.Reader
-	writer *kafka.Writer
-}
-
-func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
-	// dialer
-	dialer := &kafka.Dialer{
-		Timeout:   config.Kafka.Timeout * time.Second,
-		DualStack: true,
-	}
-
-	reader := kafka.NewReader(kafka.ReaderConfig{
-		Brokers:     brokers,
-		Topic:       topic,
-		StartOffset: kafka.FirstOffset,
-		Dialer:      dialer,
-	})
-
-	// exchange
-	reader.SetOffsetAt(context.TODO(), time.Now())
-	return &StackExchange{
-		Brokers:   brokers,
-		Topic:     topic,
-		Partition: partition,
-		MaxBytes:  maxBytes,
-
-		offset: 0,
-		reader: reader,
-		writer: &kafka.Writer{
-			Addr:                   kafka.TCP(brokers...),
-			Topic:                  topic,
-			AllowAutoTopicCreation: true,
-		},
-	}
-}
-
-func (exc *StackExchange) OnlineNotify(userId string) error {
-	return exc.writer.WriteMessages(context.Background(), kafka.Message{
-		Key:   StackExchangeOnline,
-		Value: []byte(userId),
-	})
-}
-
-func (exc *StackExchange) OfflineNotify(userId string) error {
-	return exc.writer.WriteMessages(context.Background(), kafka.Message{
-		Key:   StackExchangeOffline,
-		Value: []byte(userId),
-	})
-}
-
-func (exc *StackExchange) SendMessage(ctx context.Context, message kafka.Message) error {
-	return exc.writer.WriteMessages(ctx, message)
-}
-
-func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
-	msg, err := exc.reader.ReadMessage(context.Background())
-	if err != nil {
-		return kafka.Message{}, err
-	}
-
-	exc.offset = exc.offset + 1
-	return msg, nil
-}
-
-func (exc *StackExchange) writeMessage(ctx context.Context, msgs ...kafka.Message) error {
-	return exc.writer.WriteMessages(ctx, msgs...)
-}
-
-func (exec *StackExchange) Offset() int64 {
-	return exec.reader.Offset()
-}
-
-func (exec *StackExchange) SetOffsetAt() error {
-	ctx, cannel := context.WithTimeout(context.Background(), 3*time.Second)
-	defer cannel()
-	return exec.reader.SetOffsetAt(ctx, time.Now())
-}
+// var (
+// 	StackExchangeOnline    = []byte("exchange_online")    // StackExchangeOnline 上线
+// 	StackExchangeOffline   = []byte("exchange_offline")   // StackExchangeOffline 下线
+// 	StackExchangeMessaging = []byte("exchange_messaging") // StackExchangeMessaging 消息
+// )
+
+// type StackExchange struct {
+// 	Brokers   []string
+// 	Topic     string
+// 	Partition int
+// 	MaxBytes  int
+
+// 	offset int64
+// 	reader *kafka.Reader
+// 	writer *kafka.Writer
+// }
+
+// func NewStackExchange(brokers []string, topic string, partition int, maxBytes int) *StackExchange {
+// 	// dialer
+// 	dialer := &kafka.Dialer{
+// 		Timeout:   config.Kafka.Timeout * time.Second,
+// 		DualStack: true,
+// 	}
+
+// 	reader := kafka.NewReader(kafka.ReaderConfig{
+// 		Brokers:     brokers,
+// 		Topic:       topic,
+// 		StartOffset: kafka.FirstOffset,
+// 		Dialer:      dialer,
+// 	})
+
+// 	// exchange
+// 	reader.SetOffsetAt(context.TODO(), time.Now())
+// 	return &StackExchange{
+// 		Brokers:   brokers,
+// 		Topic:     topic,
+// 		Partition: partition,
+// 		MaxBytes:  maxBytes,
+
+// 		offset: 0,
+// 		reader: reader,
+// 		writer: &kafka.Writer{
+// 			Addr:                   kafka.TCP(brokers...),
+// 			Topic:                  topic,
+// 			AllowAutoTopicCreation: true,
+// 		},
+// 	}
+// }
+
+// func (exc *StackExchange) OnlineNotify(userId string) error {
+// 	return exc.writer.WriteMessages(context.Background(), kafka.Message{
+// 		Key:   StackExchangeOnline,
+// 		Value: []byte(userId),
+// 	})
+// }
+
+// func (exc *StackExchange) OfflineNotify(userId string) error {
+// 	return exc.writer.WriteMessages(context.Background(), kafka.Message{
+// 		Key:   StackExchangeOffline,
+// 		Value: []byte(userId),
+// 	})
+// }
+
+// func (exc *StackExchange) SendMessage(ctx context.Context, message kafka.Message) error {
+// 	return exc.writer.WriteMessages(ctx, message)
+// }
+
+// func (exc *StackExchange) ReadMessage() (kafka.Message, error) {
+// 	msg, err := exc.reader.ReadMessage(context.Background())
+// 	if err != nil {
+// 		return kafka.Message{}, err
+// 	}
+
+// 	exc.offset = exc.offset + 1
+// 	return msg, nil
+// }
+
+// func (exc *StackExchange) writeMessage(ctx context.Context, msgs ...kafka.Message) error {
+// 	return exc.writer.WriteMessages(ctx, msgs...)
+// }
+
+// func (exec *StackExchange) Offset() int64 {
+// 	return exec.reader.Offset()
+// }
+
+// func (exec *StackExchange) SetOffsetAt() error {
+// 	ctx, cannel := context.WithTimeout(context.Background(), 3*time.Second)
+// 	defer cannel()
+// 	return exec.reader.SetOffsetAt(ctx, time.Now())
+// }

+ 6 - 8
websocket.go

@@ -11,7 +11,6 @@ import (
 	"sikey.com/websocket/config"
 	"sikey.com/websocket/repositories"
 	"sikey.com/websocket/server"
-	"sikey.com/websocket/stackexchange"
 	"sikey.com/websocket/utils/mysqlx"
 	"sikey.com/websocket/utils/zlog"
 )
@@ -33,12 +32,12 @@ func newApp() *gin.Engine {
 	app := gin.Default()
 
 	// 创建 Websocket 横向拓展应用
-	exc := stackexchange.NewStackExchange(
-		config.Kafka.Brokers,
-		config.Kafka.Topic,
-		config.Kafka.Partition,
-		config.Kafka.MaxBytes,
-	)
+	// exc := stackexchange.NewStackExchange(
+	// 	config.Kafka.Brokers,
+	// 	config.Kafka.Topic,
+	// 	config.Kafka.Partition,
+	// 	config.Kafka.MaxBytes,
+	// )
 
 	srv := &server.Server{
 		Upgrader: websocket.Upgrader{
@@ -55,7 +54,6 @@ func newApp() *gin.Engine {
 			ConnectSize:    1024,
 			DisconnectSize: 1024,
 			MessageSize:    125,
-			StackExchange:  exc,
 		}),
 		Repositories: repositories.NewRepositories(mysqlx.ConnectMysql()),
 	}