diff --git a/internal/logic/gameAct/gameAct.go b/internal/logic/gameAct/gameAct.go index 3aa88a4..da3759d 100644 --- a/internal/logic/gameAct/gameAct.go +++ b/internal/logic/gameAct/gameAct.go @@ -26,6 +26,7 @@ var ( Name = "game_act" ActList = gset.New(true) RunTimeMax *gtime.Time + TaskMax int64 = 5 ) type sGameAct struct { @@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) { _, err = g.Redis().Set(ctx, keyCache, data) var CacheKey = fmt.Sprintf("act:update:%d", uid) - pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3) + pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3+time.Hour*12) return } @@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) { go func() { scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error { if gtime.Now().After(RunTimeMax) { - return errors.New("Redis扫描超时") + return errors.New("redis扫描超时") } for _, key := range keys { if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil { @@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC // @param update []*entity.GameAct: 更新数据 // @return err error: 返回错误信息 func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) { + tx, err := g.DB().Begin(ctx) + if err != nil { + g.Log().Error(ctx, err) + return + } + //批量写入数据库 - updateCount := 0 + var updateCount int64 if len(update) > 0 { for _, v := range update { v.UpdatedAt = gtime.Now() - updateRes, err2 := g.Model(Name).Where(do.GameAct{ + updateRes, err2 := tx.Model(Name).Where(do.GameAct{ Uid: v.Uid, ActId: v.ActId, }).Data(v).Update() @@ -438,10 +445,22 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v) continue } + + updateCount++ + if updateCount > TaskMax { + g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) + err = tx.Commit() + if err != nil { + g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err) + return + } + updateCount = 0 + tx, err = g.DB().Begin(ctx) + } //删除缓存 s.DelCacheKey(ctx, v.ActId, v.Uid) - updateCount++ } + //循环结束了,最后写入一波 g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) update = (update)[:0] } @@ -449,7 +468,7 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) var addCount int64 if len(add) > 0 { for _, v := range add { - addRes, err2 := g.Model(Name).Data(v).Insert() + addRes, err2 := tx.Model(Name).Data(v).Insert() if err2 != nil { g.Log().Error(ctx, err2) continue @@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) continue } addCount++ + if addCount > TaskMax { + g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) + err = tx.Commit() + if err != nil { + g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err) + return + } + addCount = 0 + tx, err = g.DB().Begin(ctx) + } //删除缓存 s.DelCacheKey(ctx, v.ActId, v.Uid) } + + //循环结束了,最后写入一波 g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) add = (add)[:0] } + + err = tx.Commit() return } @@ -478,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * addClosed := false updateClosed := false + tx, err := g.DB().Begin(ctx) + if err != nil { + g.Log().Error(ctx, err) + return + } + // 使用链式安全模式 - var db = g.Model(Name).Safe() + //var db = tx.Model(Name).Safe() for { //检查是否两个通道都已关闭且为空 @@ -493,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * addClosed = true // 仅标记关闭,不立即日志 continue } - addRes, err2 := db.Data(v).Insert() + addRes, err2 := tx.Model(Name).Data(v).Insert() if err2 != nil { g.Log().Error(ctx, err2) continue @@ -504,6 +543,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * } row, _ := addRes.RowsAffected() addCount += row + + if addCount > TaskMax { + g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) + err = tx.Commit() + if err != nil { + g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err) + return + } + addCount = 0 + tx, err = g.DB().Begin(ctx) + } + //删除缓存 s.DelCacheKey(ctx, v.ActId, v.Uid) @@ -513,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * continue } v.UpdatedAt = gtime.Now() - updateRes, err2 := db.Where(do.GameAct{ + updateRes, err2 := tx.Model(Name).Where(do.GameAct{ Uid: v.Uid, ActId: v.ActId, }).Data(v).Update() @@ -525,9 +576,21 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * //g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v) continue } + updateCount++ + + if updateCount > TaskMax { + g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) + err = tx.Commit() + if err != nil { + g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err) + return + } + updateCount = 0 + tx, err = g.DB().Begin(ctx) + } + //删除缓存 s.DelCacheKey(ctx, v.ActId, v.Uid) - updateCount++ case <-ctx.Done(): g.Log().Debug(ctx, "act协程被上下文取消") @@ -535,6 +598,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan * } } + err = tx.Commit() // 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志) g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)