From 8d640be4723212f743a9d81dba5ed081c8ed9eb7 Mon Sep 17 00:00:00 2001 From: ayflying Date: Mon, 24 Mar 2025 17:39:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0websocket=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aycache/aycache.go | 39 +++++++++++++++++++++++++++++++++++++ pkg/websocket/registerer.go | 4 +++- pkg/websocket/socket_new.go | 16 ++++++++------- 3 files changed, 51 insertions(+), 8 deletions(-) create mode 100644 aycache/aycache.go diff --git a/aycache/aycache.go b/aycache/aycache.go new file mode 100644 index 0000000..aa5512f --- /dev/null +++ b/aycache/aycache.go @@ -0,0 +1,39 @@ +package aycache + +import ( + "github.com/ayflying/utility_go/pkg/aycache/drive" + drive2 "github.com/ayflying/utility_go/pkg/aycache/drive" + "github.com/gogf/gf/v2/os/gcache" +) + +type Mod struct { + client *gcache.Cache +} + +//func NewV1(_name ...string) *cache.Mod { +// return pgk.Cache +//} + +// Deprecated:弃用,改用 pgk.Cache() +func New(_name ...string) gcache.Adapter { + + var cacheAdapterObj gcache.Adapter + var name = "cache" + if len(_name) > 0 { + name = _name[0] + } + switch name { + case "cache": + cacheAdapterObj = drive2.NewAdapterMemory() + case "redis": + cacheAdapterObj = drive2.NewAdapterRedis() + case "file": + cacheAdapterObj = drive2.NewAdapterFile("runtime/cache") + case "es": + cacheAdapterObj = drive.NewAdapterElasticsearch([]string{"http://127.0.0.1:9200"}) + } + + //var client = gcache.New() + //client.SetAdapter(cacheAdapterObj) + return cacheAdapterObj +} diff --git a/pkg/websocket/registerer.go b/pkg/websocket/registerer.go index 595a9e7..4516da8 100644 --- a/pkg/websocket/registerer.go +++ b/pkg/websocket/registerer.go @@ -1,7 +1,9 @@ package websocket +import "context" + // 定义一个处理方法的类型 -type Handler func(uid int64, data []byte) +type Handler func(ctx context.Context, req any) (err error) // 路由器的处理映射 var ( diff --git a/pkg/websocket/socket_new.go b/pkg/websocket/socket_new.go index 066db5a..f736c86 100644 --- a/pkg/websocket/socket_new.go +++ b/pkg/websocket/socket_new.go @@ -1,6 +1,7 @@ package websocket import ( + "context" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" @@ -15,7 +16,7 @@ import ( type SocketV1 struct{} var ( - ctx = gctx.New() + //ctx = gctx.New() //Conn map[uuid.UUID]*WebsocketData lock sync.Mutex @@ -26,7 +27,7 @@ type WebsocketData struct { Ws *websocket.Conn Uuid uuid.UUID Uid int64 - Data g.Var + Ctx context.Context } func NewV1() *SocketV1 { @@ -51,6 +52,7 @@ func (s *SocketV1) Load(serv *ghttp.Server, prefix string) { } group.Bind( func(r *ghttp.Request) { + ctx := r.Context() ws, err := websocketCfg.Upgrade(r.Response.Writer, r.Request, nil) if err != nil { glog.Error(ctx, err) @@ -58,7 +60,7 @@ func (s *SocketV1) Load(serv *ghttp.Server, prefix string) { } //ws联机触发器 - NewV1().OnConnect(ws) + NewV1().OnConnect(ctx, ws) }, ) @@ -70,7 +72,7 @@ func (s *SocketV1) Load(serv *ghttp.Server, prefix string) { // @Description: // @receiver s // @param conn -func (s *SocketV1) OnConnect(conn *websocket.Conn) { +func (s *SocketV1) OnConnect(ctx context.Context, conn *websocket.Conn) { //lock.Lock() //defer lock.Unlock() @@ -81,7 +83,7 @@ func (s *SocketV1) OnConnect(conn *websocket.Conn) { data := &WebsocketData{ Uuid: id, Ws: conn, - Data: g.Var{}, + Ctx: ctx, } m.Set(id, data) @@ -121,7 +123,7 @@ func (s *SocketV1) OnMessage(conn *WebsocketData, req []byte, msgType int) { handler, exist := handlers[cmd] if exist { //匹配上路由器 - handler(conn.Uid, []byte(msg)) + handler(conn.Ctx, msg) } else { //fmt.Println("未注册的路由器ID:", cmd) s.Send(conn.Uuid, []byte("未注册的协议号:"+msgStr[:8])) @@ -166,7 +168,7 @@ func (s *SocketV1) SendAll(data []byte) { // @param conn func (s *SocketV1) OnClose(id uuid.UUID, conn *websocket.Conn) { // 在此处编写断开连接后的处理逻辑 - g.Log().Debugf(ctx, "WebSocket connection from %s has been closed.", conn.RemoteAddr()) + g.Log().Debugf(gctx.New(), "WebSocket connection from %s has been closed.", conn.RemoteAddr()) // 可能的后续操作: // 1. 更新连接状态或从连接池移除