@@ -24,6 +24,8 @@ var (
Name = "game_act"
Name = "game_act"
ActList = gset . New ( true )
ActList = gset . New ( true )
RunTimeMax * gtime . Time
RunTimeMax * gtime . Time
addChan chan * entity . GameAct
updateChan chan * entity . GameAct
)
)
type sGameAct struct {
type sGameAct struct {
@@ -294,19 +296,26 @@ func (s *sGameAct) SavesV2() (err error) {
RunTimeMax = gtime . Now ( ) . Add ( time . Minute * 30 )
RunTimeMax = gtime . Now ( ) . Add ( time . Minute * 30 )
//cacheKey := fmt.Sprintf("act:%v:*", actId)
//cacheKey := fmt.Sprintf("act:%v:*", actId)
var add = make ( [ ] * entity . GameAct , 0 )
var update = make ( [ ] * entity . GameAct , 0 )
//循环获取缓存数据
addChan = make ( chan * entity . GameAct , 1000 )
err = tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) ( err error ) {
updateChan = make ( chan * entity . GameAct , 1000 )
for _ , key := range keys {
//格式化数据
go func ( ) {
err = s . SaveV2 ( ctx , key , add , update )
//循环获取缓存数据
//持久化数据
err = tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) ( err error ) {
er r = s . Cache2Sql ( ctx , add , update )
fo r _ , key := range keys {
}
//格式化数据
return err
err = s . SaveV2 ( ctx , key )
} )
}
return err
} )
//关闭通道
close ( addChan )
close ( updateChan )
} ( )
// 启动缓存数据到数据库通道
s . Cache2SqlChan ( ctx )
return
return
}
}
@@ -320,7 +329,7 @@ func (s *sGameAct) SavesV2() (err error) {
// @param add []*entity.GameAct: 添加数据
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
// @return err error: 返回错误信息
func ( s * sGameAct ) SaveV2 ( ctx context . Context , cacheKey string , add , update [ ] * entity . GameAct ) ( err error ) {
func ( s * sGameAct ) SaveV2 ( ctx context . Context , cacheKey string ) ( err error ) {
result := strings . Split ( cacheKey , ":" )
result := strings . Split ( cacheKey , ":" )
actId := gconv . Int ( result [ 1 ] )
actId := gconv . Int ( result [ 1 ] )
@@ -359,20 +368,27 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*e
g . Log ( ) . Errorf ( ctx , "当前数据错误: %v" , cacheKey )
g . Log ( ) . Errorf ( ctx , "当前数据错误: %v" , cacheKey )
return
return
}
}
//如果没有数据,添加
//如果没有数据,添加
actionData := cacheGet . String ( )
actionData := cacheGet . String ( )
if data == nil {
if data == nil {
add = append ( add , & entity. GameAct {
//add = append(add, & entity. GameAct{
// ActId: actId,
// Uid: uid,
// Action: actionData,
//})
addChan <- & entity . GameAct {
ActId : actId ,
ActId : actId ,
Uid : uid ,
Uid : uid ,
Action : actionData ,
Action : actionData ,
} )
}
} else {
} else {
//覆盖数据
//覆盖数据
data . ActId = actId
data . ActId = actId
data . Uid = uid
data . Uid = uid
data . Action = actionData
data . Action = actionData
update = append( update, data )
// update = append( update, data)
updateChan <- data
}
}
return
return
@@ -385,10 +401,10 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*e
// @param add []*entity.GameAct: 添加数据
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
// @return err error: 返回错误信息
func ( s * sGameAct ) Cache2Sql ( ctx context . Context , add , update [ ] * entity . GameAct ) ( err error ) {
func ( s * sGameAct ) Cache2Sql ( ctx context . Context , add , update [ ] * entity . GameAct ) {
//批量写入数据库
//批量写入数据库
updateCount := 0
updateCount := 0
if len ( update ) > 10 0 {
if len ( update ) > 0 {
for _ , v := range update {
for _ , v := range update {
v . UpdatedAt = gtime . Now ( )
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
@@ -399,25 +415,20 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
g . Log ( ) . Error ( ctx , err2 )
g . Log ( ) . Error ( ctx , err2 )
continue
continue
}
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
continue
}
}
//删除缓存
//删除缓存
go s . DelCacheKey ( ctx , v . ActId , v . Uid )
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
updateCount ++
update = make ( [ ] * entity . GameAct , 0 )
}
}
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
update = make ( [ ] * entity . GameAct , 0 )
update = ( update ) [ : 0 ]
}
}
var addCount int64
var addCount int64
if len ( add ) > 10 0 {
if len ( add ) > 0 {
for _ , v := range add {
for _ , v := range add {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
if err2 != nil {
@@ -430,14 +441,61 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
}
}
addCount ++
addCount ++
//删除缓存
//删除缓存
go s . DelCacheKey ( ctx , v . ActId , v . Uid )
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
}
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
add = make ( [ ] * entity . GameAct , 0 )
add = ( add ) [ : 0 ]
}
}
return
return
}
}
// Cache2SqlChan 缓存持久化到数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
func ( s * sGameAct ) Cache2SqlChan ( ctx context . Context ) {
//批量写入数据库
updateCount := 0
for v := range updateChan {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
}
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
var addCount int64
for v := range addChan {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := addRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次新增为0, 新增数据失败: %v" , v )
continue
}
addCount ++
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
return
}
// 删除缓存key
// 删除缓存key
func ( s * sGameAct ) DelCacheKey ( ctx context . Context , aid int , uid int64 ) {
func ( s * sGameAct ) DelCacheKey ( ctx context . Context , aid int , uid int64 ) {
cacheKey := fmt . Sprintf ( "act:%v:%v" , aid , uid )
cacheKey := fmt . Sprintf ( "act:%v:%v" , aid , uid )