优化持久化代码,提高持久化性能
This commit is contained in:
@@ -26,6 +26,7 @@ var (
|
||||
Name = "game_act"
|
||||
ActList = gset.New(true)
|
||||
RunTimeMax *gtime.Time
|
||||
TaskMax int64 = 5
|
||||
)
|
||||
|
||||
type sGameAct struct {
|
||||
@@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
||||
_, err = g.Redis().Set(ctx, keyCache, data)
|
||||
|
||||
var CacheKey = fmt.Sprintf("act:update:%d", uid)
|
||||
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3)
|
||||
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3+time.Hour*12)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
|
||||
go func() {
|
||||
scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error {
|
||||
if gtime.Now().After(RunTimeMax) {
|
||||
return errors.New("Redis扫描超时")
|
||||
return errors.New("redis扫描超时")
|
||||
}
|
||||
for _, key := range keys {
|
||||
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
|
||||
@@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
|
||||
// @param update []*entity.GameAct: 更新数据
|
||||
// @return err error: 返回错误信息
|
||||
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
|
||||
tx, err := g.DB().Begin(ctx)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
//批量写入数据库
|
||||
updateCount := 0
|
||||
var updateCount int64
|
||||
if len(update) > 0 {
|
||||
for _, v := range update {
|
||||
v.UpdatedAt = gtime.Now()
|
||||
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
||||
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
}).Data(v).Update()
|
||||
@@ -438,10 +445,22 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
||||
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
|
||||
updateCount++
|
||||
if updateCount > TaskMax {
|
||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
|
||||
return
|
||||
}
|
||||
updateCount = 0
|
||||
tx, err = g.DB().Begin(ctx)
|
||||
}
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
updateCount++
|
||||
}
|
||||
//循环结束了,最后写入一波
|
||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||
update = (update)[:0]
|
||||
}
|
||||
@@ -449,7 +468,7 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
||||
var addCount int64
|
||||
if len(add) > 0 {
|
||||
for _, v := range add {
|
||||
addRes, err2 := g.Model(Name).Data(v).Insert()
|
||||
addRes, err2 := tx.Model(Name).Data(v).Insert()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
@@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
||||
continue
|
||||
}
|
||||
addCount++
|
||||
if addCount > TaskMax {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||
return
|
||||
}
|
||||
addCount = 0
|
||||
tx, err = g.DB().Begin(ctx)
|
||||
}
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
}
|
||||
|
||||
//循环结束了,最后写入一波
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
add = (add)[:0]
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -478,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
addClosed := false
|
||||
updateClosed := false
|
||||
|
||||
tx, err := g.DB().Begin(ctx)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 使用链式安全模式
|
||||
var db = g.Model(Name).Safe()
|
||||
//var db = tx.Model(Name).Safe()
|
||||
|
||||
for {
|
||||
//检查是否两个通道都已关闭且为空
|
||||
@@ -493,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
addClosed = true // 仅标记关闭,不立即日志
|
||||
continue
|
||||
}
|
||||
addRes, err2 := db.Data(v).Insert()
|
||||
addRes, err2 := tx.Model(Name).Data(v).Insert()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
@@ -504,6 +543,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
}
|
||||
row, _ := addRes.RowsAffected()
|
||||
addCount += row
|
||||
|
||||
if addCount > TaskMax {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||
return
|
||||
}
|
||||
addCount = 0
|
||||
tx, err = g.DB().Begin(ctx)
|
||||
}
|
||||
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
|
||||
@@ -513,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
continue
|
||||
}
|
||||
v.UpdatedAt = gtime.Now()
|
||||
updateRes, err2 := db.Where(do.GameAct{
|
||||
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
}).Data(v).Update()
|
||||
@@ -525,9 +576,21 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
//g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
updateCount++
|
||||
|
||||
if updateCount > TaskMax {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||
return
|
||||
}
|
||||
updateCount = 0
|
||||
tx, err = g.DB().Begin(ctx)
|
||||
}
|
||||
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
updateCount++
|
||||
|
||||
case <-ctx.Done():
|
||||
g.Log().Debug(ctx, "act协程被上下文取消")
|
||||
@@ -535,6 +598,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||
|
||||
Reference in New Issue
Block a user