From e1f1bea0e76f3318c93c45bbeedbf75bba15b66b Mon Sep 17 00:00:00 2001 From: ayflying Date: Tue, 2 Sep 2025 18:16:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=9C=89=E4=BC=98=E9=9B=85?= =?UTF-8?q?=E7=9A=84=E6=8C=81=E4=B9=85=E5=8C=96=E7=AE=A1=E9=81=93=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E4=B8=8A=E4=BC=A0=E9=80=9F=E5=BA=A6=E4=B8=8E?= =?UTF-8?q?=E6=8A=97=E6=89=93=E6=96=AD=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/logic/gameAct/gameAct.go | 114 ++++++++++++++++++++++-------- service/game_act.go | 4 +- 2 files changed, 88 insertions(+), 30 deletions(-) diff --git a/internal/logic/gameAct/gameAct.go b/internal/logic/gameAct/gameAct.go index f4257d7..02cdc62 100644 --- a/internal/logic/gameAct/gameAct.go +++ b/internal/logic/gameAct/gameAct.go @@ -24,6 +24,8 @@ var ( Name = "game_act" ActList = gset.New(true) RunTimeMax *gtime.Time + addChan chan *entity.GameAct + updateChan chan *entity.GameAct ) type sGameAct struct { @@ -294,19 +296,26 @@ func (s *sGameAct) SavesV2() (err error) { RunTimeMax = gtime.Now().Add(time.Minute * 30) //cacheKey := fmt.Sprintf("act:%v:*", actId) - var add = make([]*entity.GameAct, 0) - var update = make([]*entity.GameAct, 0) - //循环获取缓存数据 - err = tools.Redis.RedisScanV2("act:*", func(keys []string) (err error) { - for _, key := range keys { - //格式化数据 - err = s.SaveV2(ctx, key, add, update) - //持久化数据 - err = s.Cache2Sql(ctx, add, update) - } - return err - }) + addChan = make(chan *entity.GameAct, 1000) + updateChan = make(chan *entity.GameAct, 1000) + + go func() { + //循环获取缓存数据 + err = tools.Redis.RedisScanV2("act:*", func(keys []string) (err error) { + for _, key := range keys { + //格式化数据 + err = s.SaveV2(ctx, key) + + } + return err + }) + //关闭通道 + close(addChan) + close(updateChan) + }() + // 启动缓存数据到数据库通道 + s.Cache2SqlChan(ctx) return } @@ -320,7 +329,7 @@ func (s *sGameAct) SavesV2() (err error) { // @param add []*entity.GameAct: 添加数据 // @param update []*entity.GameAct: 更新数据 // @return err error: 返回错误信息 -func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*entity.GameAct) (err error) { +func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string) (err error) { result := strings.Split(cacheKey, ":") actId := gconv.Int(result[1]) @@ -359,20 +368,27 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*e g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey) return } + //如果没有数据,添加 actionData := cacheGet.String() if data == nil { - add = append(add, &entity.GameAct{ + //add = append(add, &entity.GameAct{ + // ActId: actId, + // Uid: uid, + // Action: actionData, + //}) + addChan <- &entity.GameAct{ ActId: actId, Uid: uid, Action: actionData, - }) + } } else { //覆盖数据 data.ActId = actId data.Uid = uid data.Action = actionData - update = append(update, data) + //update = append(update, data) + updateChan <- data } return @@ -385,10 +401,10 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*e // @param add []*entity.GameAct: 添加数据 // @param update []*entity.GameAct: 更新数据 // @return err error: 返回错误信息 -func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) (err error) { +func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) { //批量写入数据库 updateCount := 0 - if len(update) > 100 { + if len(update) > 0 { for _, v := range update { v.UpdatedAt = gtime.Now() updateRes, err2 := g.Model(Name).Where(do.GameAct{ @@ -399,25 +415,20 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) g.Log().Error(ctx, err2) continue } - if row, _ := updateRes.RowsAffected(); row == 0 { g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v) continue } - //删除缓存 - go s.DelCacheKey(ctx, v.ActId, v.Uid) - + s.DelCacheKey(ctx, v.ActId, v.Uid) updateCount++ - update = make([]*entity.GameAct, 0) } - g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) - update = make([]*entity.GameAct, 0) + update = (update)[:0] } var addCount int64 - if len(add) > 100 { + if len(add) > 0 { for _, v := range add { addRes, err2 := g.Model(Name).Data(v).Insert() if err2 != nil { @@ -430,14 +441,61 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) } addCount++ //删除缓存 - go s.DelCacheKey(ctx, v.ActId, v.Uid) + s.DelCacheKey(ctx, v.ActId, v.Uid) } g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) - add = make([]*entity.GameAct, 0) + add = (add)[:0] } return } +// Cache2SqlChan 缓存持久化到数据库 +// @Description: 缓存持久化到数据库 +// @receiver s *sGameAct: 游戏活动服务结构体指针 +// @param ctx context.Context: 上下文对象 +func (s *sGameAct) Cache2SqlChan(ctx context.Context) { + //批量写入数据库 + updateCount := 0 + for v := range updateChan { + v.UpdatedAt = gtime.Now() + updateRes, err2 := g.Model(Name).Where(do.GameAct{ + Uid: v.Uid, + ActId: v.ActId, + }).Data(v).Update() + if err2 != nil { + g.Log().Error(ctx, err2) + continue + } + if row, _ := updateRes.RowsAffected(); row == 0 { + g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v) + continue + } + //删除缓存 + s.DelCacheKey(ctx, v.ActId, v.Uid) + updateCount++ + } + g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) + + var addCount int64 + for v := range addChan { + addRes, err2 := g.Model(Name).Data(v).Insert() + if err2 != nil { + g.Log().Error(ctx, err2) + continue + } + if row, _ := addRes.RowsAffected(); row == 0 { + g.Log().Error(ctx, "本次新增为0,新增数据失败: %v", v) + continue + } + addCount++ + //删除缓存 + s.DelCacheKey(ctx, v.ActId, v.Uid) + } + g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) + + return +} + // 删除缓存key func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) { cacheKey := fmt.Sprintf("act:%v:%v", aid, uid) diff --git a/service/game_act.go b/service/game_act.go index 3a4ebcf..164c8a8 100644 --- a/service/game_act.go +++ b/service/game_act.go @@ -63,7 +63,7 @@ type ( // @param add []*entity.GameAct: 添加数据 // @param update []*entity.GameAct: 更新数据 // @return err error: 返回错误信息 - SaveV2(ctx context.Context, cacheKey string, add []*entity.GameAct, update []*entity.GameAct) (err error) + SaveV2(ctx context.Context, cacheKey string) (err error) // Cache2Sql 缓存持久化到数据库 // @Description: 缓存持久化到数据库 // @receiver s *sGameAct: 游戏活动服务结构体指针 @@ -71,7 +71,7 @@ type ( // @param add []*entity.GameAct: 添加数据 // @param update []*entity.GameAct: 更新数据 // @return err error: 返回错误信息 - Cache2Sql(ctx context.Context, add []*entity.GameAct, update []*entity.GameAct) (err error) + Cache2Sql(ctx context.Context, add []*entity.GameAct, update []*entity.GameAct) // 删除缓存key DelCacheKey(ctx context.Context, aid int, uid int64) // 清空GetRedDot缓存