@@ -26,6 +26,7 @@ var (
Name = "game_act"
ActList = gset . New ( true )
RunTimeMax * gtime . Time
TaskMax int64 = 5
)
type sGameAct struct {
@@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
_ , err = g . Redis ( ) . Set ( ctx , keyCache , data )
var CacheKey = fmt . Sprintf ( "act:update:%d" , uid )
pkg . Cache ( "redis" ) . Set ( ctx , CacheKey , uid , time . Hour * 24 * 1 )
pkg . Cache ( "redis" ) . Set ( ctx , CacheKey , uid , time . Hour * 24 * 3 + time . Hour * 12 )
return
}
@@ -236,7 +237,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
}
//删除缓存
go s . DelCacheKey ( ctx , v . ActId , v . Uid )
//s.DelCacheKey(ctx, v.ActId, v. Uid)
updateCount ++
update = make ( [ ] * entity . GameAct , 0 )
@@ -264,10 +265,10 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return
}
for _ , v2 := range add {
//删除缓存
go s . DelCacheKey ( ctx , v2 . ActId , v2 . Uid )
}
//for _, v2 := range add {
// //删除缓存
// s.DelCacheKey(ctx, v2.ActId, v2. Uid)
// }
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
add = make ( [ ] * entity . GameAct , 0 )
@@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
go func ( ) {
scanErr := tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) error {
if gtime . Now ( ) . After ( RunTimeMax ) {
return errors . New ( "R edis扫描超时" )
return errors . New ( "r edis扫描超时" )
}
for _ , key := range keys {
if keyErr := s . SaveV2 ( ctx , key , addChan , updateChan ) ; keyErr != nil {
@@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func ( s * sGameAct ) Cache2Sql ( ctx context . Context , add , update [ ] * entity . GameAct ) {
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Error ( ctx , err )
return
}
//批量写入数据库
updateCount := 0
var updateCount int64
if len ( update ) > 0 {
for _ , v := range update {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
updateRes , err2 := tx . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
@@ -438,10 +445,22 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
if updateCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前更新数据库失败:%v" , err )
return
}
updateCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
//s.DelCacheKey(ctx, v.ActId, v.Uid)
}
//循环结束了,最后写入一波
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
update = ( update ) [ : 0 ]
}
@@ -449,7 +468,7 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
var addCount int64
if len ( add ) > 0 {
for _ , v := range add {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
addRes , err2 := tx . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
@@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
continue
}
addCount ++
if addCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
addCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey( ctx , v . ActId , v . Uid )
//s. DelCacheKey(ctx, v.ActId, v. Uid)
}
//循环结束了,最后写入一波
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
add = ( add ) [ : 0 ]
}
err = tx . Commit ( )
return
}
@@ -478,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed := false
updateClosed := false
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Error ( ctx , err )
return
}
// 使用链式安全模式
var db = g . Model ( Name ) . Safe ( )
//var db = tx.Model(Name).Safe( )
for {
//检查是否两个通道都已关闭且为空
@@ -493,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed = true // 仅标记关闭,不立即日志
continue
}
addRes , err2 := db . Data ( v ) . Insert ( )
addRes , err2 := tx . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
@@ -504,8 +543,20 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
}
row , _ := addRes . RowsAffected ( )
addCount += row
if addCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
addCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
s . DelCacheKey( ctx , v . ActId , v . Uid )
//s. DelCacheKey(ctx, v.ActId, v. Uid)
case v , ok := <- updateChan :
if ! ok {
@@ -513,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
continue
}
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := db . Where ( do . GameAct {
updateRes , err2 := tx . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
@@ -525,16 +576,29 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
//g.Log().Error(ctx, "本次更新为0, 更新数据失败: %v", v)
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
if updateCount > TaskMax {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Debugf ( ctx , "act当前写入数据库失败:%v" , err )
return
}
updateCount = 0
tx , err = g . DB ( ) . Begin ( ctx )
}
//删除缓存
//s.DelCacheKey(ctx, v.ActId, v.Uid)
case <- ctx . Done ( ) :
g . Log ( ) . Debug ( ctx , "act协程被上下文取消" )
return
}
}
err = tx . Commit ( )
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
@@ -543,17 +607,19 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
// 删除缓存key
func ( s * sGameAct ) DelCacheKey ( ctx context . Context , aid int , uid int64 ) {
//如果有活跃,跳过删除
if getBool , _ := pkg . Cache ( "redis" ) .
Contains ( ctx , fmt . Sprintf ( "act:update:%d" , uid ) ) ; getBool {
return
}
go func ( ) {
//如果有活跃,跳过删除
if getBool , _ := pkg . Cache ( "redis" ) .
Contains ( ctx , fmt . Sprintf ( "act:update:%d" , uid ) ) ; getBool {
return
}
cacheKey := fmt . Sprintf ( "act:%v:%v" , aid , uid )
_ , err := g . Redis ( ) . Del ( ctx , cacheKey )
if err != nil {
g . Log ( ) . Error ( ctx , err )
}
cacheKey := fmt . Sprintf ( "act:%v:%v" , aid , uid )
_ , err := g . Redis ( ) . Del ( ctx , cacheKey )
if err != nil {
g . Log ( ) . Error ( ctx , err )
}
} ( )
}
// 清空GetRedDot缓存