diff --git a/pkg/websocket/registerer.go b/pkg/websocket/registerer.go index a59b558..b5c9fe5 100644 --- a/pkg/websocket/registerer.go +++ b/pkg/websocket/registerer.go @@ -8,7 +8,7 @@ type Handler2 func(conn *WebsocketData) type HandlerMessage func(conn *WebsocketData, req any) -type PbType func(code int32, data []byte) proto.Message +type PbType func(cmd int32, data []byte, code int32, msg string) proto.Message type PbType2 func(data []byte) (int, []byte) // 路由器的处理映射 diff --git a/pkg/websocket/socket_new.go b/pkg/websocket/socket_new.go index 2186a23..b374e0d 100644 --- a/pkg/websocket/socket_new.go +++ b/pkg/websocket/socket_new.go @@ -4,14 +4,15 @@ import ( "context" "fmt" "github.com/gogf/gf/v2/container/gmap" + "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" "github.com/google/uuid" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" - "strconv" "sync" ) @@ -28,10 +29,11 @@ var ( ) type WebsocketData struct { - Ws *websocket.Conn - Uuid string - Uid int64 - Ctx context.Context + Ws *websocket.Conn + Uuid string + Uid int64 + Ctx context.Context + RoomId int } func NewV1() *SocketV1 { @@ -41,11 +43,12 @@ func NewV1() *SocketV1 { } type SocketInterface interface { - OnConnect(*websocket.Conn) - OnMessage(*WebsocketData, []byte, int) - Send(uuid.UUID, []byte) (err error) - SendAll(data []byte) - OnClose(conn *websocket.Conn) + Load(serv *ghttp.Server, prefix string) + OnConnect(ctx context.Context, conn *websocket.Conn) + OnMessage(conn *WebsocketData, req []byte, msgType int) + Send(cmd int32, uid int64, req proto.Message) + SendAll(cmd int32, req proto.Message) + OnClose(conn *WebsocketData) } func (s *SocketV1) Load(serv *ghttp.Server, prefix string) { @@ -92,7 +95,8 @@ func (s *SocketV1) OnConnect(ctx context.Context, conn *websocket.Conn) { //defer delete(Conn, id) - //to := fmt.Sprintf("创建连接:%v,ip=%v", id, ip) + to := fmt.Sprintf("创建连接:%v,ip=%v", id, ip) + g.Log().Debugf(ctx, to) //s.Send(id, []byte(to)) //用户登录钩子执行 @@ -124,13 +128,13 @@ func (s *SocketV1) OnConnect(ctx context.Context, conn *websocket.Conn) { // @param msgType func (s *SocketV1) OnMessage(conn *WebsocketData, req []byte, msgType int) { s.Type = 2 - var cmd int var msg []byte //uid := conn.Uid for _, v := range Pb2Bytes { cmd, msg = v(req) } + g.Log("cmd").Debugf(gctx.New(), fmt.Sprintf("from|%d|%d|%v", cmd, conn.Uid, gjson.MustEncodeString(req))) //msgStr := string(req) //cmd = gconv.Int(msgStr[:8]) @@ -145,7 +149,7 @@ func (s *SocketV1) OnMessage(conn *WebsocketData, req []byte, msgType int) { } } else { //fmt.Println("未注册的路由器ID:", cmd) - s.Send(20000000, conn.Uid, []byte("未注册的协议号:"+strconv.Itoa(cmd))) + //s.Send(20000000, conn.Uid, []byte("未注册的协议号:"+strconv.Itoa(cmd))) s.OnClose(conn) return } @@ -201,17 +205,22 @@ func (s *SocketV1) Uid2Uuid(uid int64) (uuid string) { // @receiver s // @param uid // @param data -func (s *SocketV1) SendUuid(code int32, id uuid.UUID, data []byte) { +func (s *SocketV1) SendUuid(cmd int32, id uuid.UUID, req proto.Message) { if !m.Contains(id) { return } + var data, err = proto.Marshal(req) + if err != nil { + g.Log().Error(gctx.New(), err) + return + } conn := m.Get(id).(*WebsocketData) //前置方法 for _, v := range Byte2Pb { - temp := v(code, data) + temp := v(cmd, data, 0, "") data, _ = proto.Marshal(temp) } @@ -226,36 +235,51 @@ func (s *SocketV1) SendUuid(code int32, id uuid.UUID, data []byte) { // @receiver s // @param uid // @param data -func (s *SocketV1) Send(code int32, uid int64, data []byte) { +func (s *SocketV1) Send(cmd int32, uid int64, req proto.Message) { + g.Log("cmd").Debugf(gctx.New(), fmt.Sprintf("to|%d|%d|%v", cmd, uid, gjson.MustEncodeString(req))) + uuid := s.Uid2Uuid(uid) if uuid == "" { return } - if !m.Contains(uuid) { return } + //格式化数据 + var data, err = proto.Marshal(req) + if err != nil { + g.Log().Error(gctx.New(), err) + return + } + conn := m.Get(uuid).(*WebsocketData) //前置方法 for _, v := range Byte2Pb { - temp := v(code, data) + temp := v(cmd, data, 0, "") data, _ = proto.Marshal(temp) } conn.Ws.WriteMessage(s.Type, data) - return } // 批量发送 -func (s *SocketV1) SendAll(code int32, data []byte) { - for _, v := range Byte2Pb { - temp := v(code, data) - data, _ = proto.Marshal(temp) +func (s *SocketV1) SendAll(cmd int32, req proto.Message) { + g.Log("cmd").Debugf(gctx.New(), fmt.Sprintf("all:%d|-1|%v", cmd, gjson.MustEncodeString(req))) + + //格式化数据 + var data, err = proto.Marshal(req) + if err != nil { + g.Log().Error(gctx.New(), err) + return } + for _, v := range Byte2Pb { + temp := v(cmd, data, 0, "") + data, _ = proto.Marshal(temp) + } m.Iterator(func(k interface{}, v interface{}) bool { conn := v.(*WebsocketData) conn.Ws.WriteMessage(s.Type, data) @@ -277,13 +301,23 @@ func (s *SocketV1) OnClose(conn *WebsocketData) { connect(conn) } uid := conn.Uid + if uid > 0 { + s.UnBindUid(uid) + } + // 可能的后续操作: // 1. 更新连接状态或从连接池移除 // 2. 发送通知或清理关联资源 // 3. 执行特定于业务的断开处理 m.Remove(conn.Uuid) - if uid > 0 { - s.UnBindUid(uid) - } conn.Ws.Close() } + +// 是否在线 +func (s *SocketV1) IsOnline(uid int64) bool { + uuid := s.Uid2Uuid(uid) + if m.Contains(uuid) { + return true + } + return false +}