@@ -24,6 +24,8 @@ var (
Name = "game_act"
ActList = gset . New ( true )
RunTimeMax * gtime . Time
addChan chan * entity . GameAct
updateChan chan * entity . GameAct
)
type sGameAct struct {
@@ -110,6 +112,12 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
return
}
// Saves 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
// Deprecated: 该方法已被弃用, 建议使用SavesV2方法
func ( s * sGameAct ) Saves ( ) ( err error ) {
var ctx = gctx . New ( )
g . Log ( ) . Debug ( ctx , "开始执行游戏act数据保存了" )
@@ -130,12 +138,19 @@ func (s *sGameAct) Saves() (err error) {
return
}
// Save 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param actId int: 活动ID
// @return err error: 返回错误信息
// deprecated: 该方法已被弃用, 建议使用SaveV2方法
func ( s * sGameAct ) Save ( ctx context . Context , actId int ) ( err error ) {
cacheKey := fmt . Sprintf ( "act:%v:*" , actId )
var add = make ( [ ] * entity . GameAct , 0 )
var update = make ( [ ] * entity . GameAct , 0 )
//循环获取缓存数据
err = tools . Redis . RedisScanV2 ( cacheKey , func ( keys [ ] string ) ( err error ) {
//判断是否超时
@@ -203,7 +218,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
//批量写入数据库
updateCount := 0
g . Log ( ) . Debugf ( ctx , "当前 %v 要更新的数据: %v 条" , actId , len ( update) )
//g.Log().Debugf(ctx, "当前 %v 要更新的数据: %v 条", actId, len( update) )
if len ( update ) > 100 {
for _ , v := range update {
v . UpdatedAt = gtime . Now ( )
@@ -231,7 +246,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
}
var count int64
g . Log ( ) . Debugf ( ctx , "当前 %v 要添加的数据: %v 条" , actId , len ( add ) )
//g.Log().Debugf(ctx, "当前 %v 要添加的数据: %v 条", actId, len(add) )
if len ( add ) > 100 {
dbRes , err2 := g . Model ( Name ) . Data ( add ) . Save ( )
@@ -268,6 +283,219 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return
}
// SavesV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
func ( s * sGameAct ) SavesV2 ( ) ( err error ) {
var ctx = gctx . New ( )
g . Log ( ) . Debug ( ctx , "开始执行游戏act数据保存了" )
//如果没有执行过,设置时间戳
// 最大允许执行时间
RunTimeMax = gtime . Now ( ) . Add ( time . Minute * 30 )
//cacheKey := fmt.Sprintf("act:%v:*", actId)
addChan = make ( chan * entity . GameAct , 1000 )
updateChan = make ( chan * entity . GameAct , 1000 )
go func ( ) {
//循环获取缓存数据
err = tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) ( err error ) {
for _ , key := range keys {
//格式化数据
err = s . SaveV2 ( ctx , key )
}
return err
} )
//关闭通道
close ( addChan )
close ( updateChan )
} ( )
// 启动缓存数据到数据库通道
s . Cache2SqlChan ( ctx )
return
}
// SaveV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param cacheKey string: 缓存键
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func ( s * sGameAct ) SaveV2 ( ctx context . Context , cacheKey string ) ( err error ) {
result := strings . Split ( cacheKey , ":" )
actId := gconv . Int ( result [ 1 ] )
if actId == 0 {
return
}
var uid int64
uid = gconv . Int64 ( result [ 2 ] )
if uid == 0 {
//跳过为空的用户缓存
return
}
//获取缓存数据
cacheGet , _ := g . Redis ( ) . Get ( ctx , cacheKey )
if cacheGet . IsEmpty ( ) {
//空数据也不保存
return
}
//如果有活跃,跳过持久化
if getBool , _ := pkg . Cache ( "redis" ) .
Contains ( ctx , fmt . Sprintf ( "act:update:%d" , uid ) ) ; getBool {
return
}
//获取数据库数据
var data * entity . GameAct
// 从数据库中查询活动信息
err = g . Model ( Name ) . Where ( do . GameAct {
Uid : uid ,
ActId : actId ,
} ) . Fields ( "uid,act_id" ) . Scan ( & data )
if err != nil {
g . Log ( ) . Errorf ( ctx , "当前数据错误: %v" , cacheKey )
return
}
//如果没有数据,添加
actionData := cacheGet . String ( )
if data == nil {
//add = append(add, &entity.GameAct{
// ActId: actId,
// Uid: uid,
// Action: actionData,
//})
addChan <- & entity . GameAct {
ActId : actId ,
Uid : uid ,
Action : actionData ,
}
} else {
//覆盖数据
data . ActId = actId
data . Uid = uid
data . Action = actionData
//update = append(update, data)
updateChan <- data
}
return
}
// Cache2Sql 缓存持久化到数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func ( s * sGameAct ) Cache2Sql ( ctx context . Context , add , update [ ] * entity . GameAct ) {
//批量写入数据库
updateCount := 0
if len ( update ) > 0 {
for _ , v := range update {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
}
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
update = ( update ) [ : 0 ]
}
var addCount int64
if len ( add ) > 0 {
for _ , v := range add {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := addRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次新增为0, 新增数据失败: %v" , v )
continue
}
addCount ++
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
add = ( add ) [ : 0 ]
}
return
}
// Cache2SqlChan 缓存持久化到数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
func ( s * sGameAct ) Cache2SqlChan ( ctx context . Context ) {
//批量写入数据库
updateCount := 0
for v := range updateChan {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
}
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
var addCount int64
for v := range addChan {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := addRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次新增为0, 新增数据失败: %v" , v )
continue
}
addCount ++
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
return
}
// 删除缓存key
func ( s * sGameAct ) DelCacheKey ( ctx context . Context , aid int , uid int64 ) {
cacheKey := fmt . Sprintf ( "act:%v:%v" , aid , uid )