@@ -26,6 +26,7 @@ var (
Name = "game_act"
ActList = gset . New ( true )
RunTimeMax * gtime . Time
TaskMax int64 = 5
)
type sGameAct struct {
@@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
_ , err = g . Redis ( ) . Set ( ctx , keyCache , data )
var CacheKey = fmt . Sprintf ( "act:update:%d" , uid )
pkg . Cache ( "redis" ) . Set ( ctx , CacheKey , uid , time . Hour * 24 * 3 )
pkg . Cache ( "redis" ) . Set ( ctx , CacheKey , uid , time . Hour * 24 * 3 + time . Hour * 12 )
return
}
@@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
go func ( ) {
scanErr := tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) error {
if gtime . Now ( ) . After ( RunTimeMax ) {
return errors . New ( "R edis扫描超时" )
return errors . New ( "r edis扫描超时" )
}
for _ , key := range keys {
if keyErr := s . SaveV2 ( ctx , key , addChan , updateChan ) ; keyErr != nil {
@@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func ( s * sGameAct ) Cache2Sql ( ctx context . Context , add , update [ ] * entity . GameAct ) {
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Error ( ctx , err )
return
}
//批量写入数据库
updateCount := 0
var updateCount int64
if len ( update ) > 0 {
for _ , v := range update {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
updateRes , err2 := tx . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
@@ -438,10 +445,22 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
updateCount ++
if updateCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前更新数据库失败:%v" , err )
return
}
updateCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
}
//循环结束了,最后写入一波
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
update = ( update ) [ : 0 ]
}
@@ -449,7 +468,7 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
var addCount int64
if len ( add ) > 0 {
for _ , v := range add {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
addRes , err2 := tx . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
@@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
continue
}
addCount ++
if addCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
addCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
//循环结束了,最后写入一波
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
add = ( add ) [ : 0 ]
}
err = tx . Commit ( )
return
}
@@ -478,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed := false
updateClosed := false
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Error ( ctx , err )
return
}
// 使用链式安全模式
var db = g . Model ( Name ) . Safe ( )
//var db = tx.Model(Name).Safe( )
for {
//检查是否两个通道都已关闭且为空
@@ -493,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed = true // 仅标记关闭,不立即日志
continue
}
addRes , err2 := db . Data ( v ) . Insert ( )
addRes , err2 := tx . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
@@ -504,6 +543,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
}
row , _ := addRes . RowsAffected ( )
addCount += row
if addCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
addCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
@@ -513,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
continue
}
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := db . Where ( do . GameAct {
updateRes , err2 := tx . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
@@ -525,9 +576,21 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
//g.Log().Error(ctx, "本次更新为0, 更新数据失败: %v", v)
continue
}
updateCount ++
if updateCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
updateCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
case <- ctx . Done ( ) :
g . Log ( ) . Debug ( ctx , "act协程被上下文取消" )
@@ -535,6 +598,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
}
}
err = tx . Commit ( )
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )