@@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/ayflying/utility_go/internal/model/do"
@@ -14,6 +15,7 @@ import (
service2 "github.com/ayflying/utility_go/service"
"github.com/ayflying/utility_go/tools"
"github.com/gogf/gf/v2/container/gset"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
@@ -24,8 +26,6 @@ var (
Name = "game_act"
ActList = gset . New ( true )
RunTimeMax * gtime . Time
addChan chan * entity . GameAct
updateChan chan * entity . GameAct
)
type sGameAct struct {
@@ -288,35 +288,54 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
// SavesV2 保存游戏活动数据
func ( s * sGameAct ) SavesV2 ( ) ( err error ) {
var ctx = gctx . New ( )
g . Log ( ) . Debug ( ctx , "开始执行游戏act数据保存了" )
//如果没有执行过,设置时间戳
// 最大允许执行时间
RunTimeMax = gtime . Now ( ) . Add ( time . Minute * 30 )
//cacheKey := fmt.Sprintf("act:%v:*", actId)
// 使用局部通道替代包级通道,避免并发冲突
addChan := make ( chan * entity . GameAct , 1000 )
updateChan := make ( chan * entity . GameAct , 1000 )
add Chan = make ( chan * entity . GameAct , 1000 )
updateChan = make ( chan * entity . GameAct , 1000 )
err Chan : = make ( chan error , 1 )
var wg sync . WaitGroup
wg . Add ( 1 ) // 仅需添加1次( 对应Cache2SqlChan协程)
// wg.Add(1) // 移除多余的Add调用, 避免计数不平衡
go func ( ) {
//循环获取缓存数据
err = tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) ( err error ) {
for _ , key := range keys {
//格式化数据
err = s . SaveV2 ( ctx , key )
defer wg . Done ( ) // Cache2SqlChan协程完成后减1
s . Cache2SqlChan ( ctx , addChan , updateChan )
} ( )
go func ( ) {
scanErr := tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) error {
if gtime . Now ( ) . After ( RunTimeMax ) {
return errors . New ( "Redis扫描超时" )
}
return err
for _ , key := range keys {
if keyErr := s . SaveV2 ( ctx , key , addChan , updateChan ) ; keyErr != nil {
g . Log ( ) . Errorf ( ctx , "处理key %s失败: %v" , key , keyErr )
}
}
return nil
} )
//关闭通道
close ( addChan )
close ( updateChan )
errChan <- scanErr
} ( )
// 启动缓存数据到数据库通道
s . Cache2SqlChan ( ctx )
// 等待扫描和处理完成,同时监听上下文取消
select {
case scanErr := <- errChan :
wg . Wait ( ) // 等待Cache2SqlChan处理完剩余数据
if scanErr != nil {
return gerror . New ( fmt . Sprintf ( "Redis扫描失败: %v" , scanErr ) )
}
case <- ctx . Done ( ) :
wg . Wait ( )
return ctx . Err ( ) // 返回上下文取消原因
}
return
}
@@ -329,7 +348,7 @@ func (s *sGameAct) SavesV2() (err error) {
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func ( s * sGameAct ) SaveV2 ( ctx context . Context , cacheKey string ) ( err error ) {
func ( s * sGameAct ) SaveV2 ( ctx context . Context , cacheKey string , addChan , updateChan chan * entity . GameAct ) ( err error ) {
result := strings . Split ( cacheKey , ":" )
actId := gconv . Int ( result [ 1 ] )
@@ -449,50 +468,76 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
return
}
// Cache2Sql Chan 缓存持久化到 数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
func ( s * sGameAct ) Cache2SqlChan ( ctx context . Context ) {
//批量写入数据库
updateCount := 0
for v := range updateChan {
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := g . Model ( Name ) . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次更新为0, 更新数据失败: %v" , v )
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
}
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
// Cache2Add Chan 批量添加 数据库
func ( s * sGameAct ) Cache2SqlChan ( ctx context . Context , addChan , updateChan chan * entity . GameAct ) {
//批量写入数据库计数
var addCount int64
for v := range addChan {
addRes , err2 := g . Model ( Name ) . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continu e
}
if row , _ := addRes . RowsAffected ( ) ; row == 0 {
g . Log ( ) . Error ( ctx , "本次新增为0, 新增数据失败: %v" , v )
continue
}
addCount ++
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
}
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
//批量更新数据库计数
var updateCount int64
//通道关闭标志
addClosed := false
updateClosed := fals e
// 使用链式安全模式
var db = g . Model ( Name ) . Safe ( )
for {
//检查是否两个通道都已关闭且为空
if addClosed && updateClosed {
break
}
select {
case v , ok := <- addChan :
if ! ok {
addClosed = true // 仅标记关闭,不立即日志
continue
}
addRes , err2 := db . Data ( v ) . Insert ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
//if row, _ := addRes.RowsAffected(); row == 0 {
// g.Log().Error(ctx, "本次新增为0, 新增数据失败: %v", v)
// continue
//}
row , _ := addRes . RowsAffected ( )
addCount += row
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
case v , ok := <- updateChan :
if ! ok {
updateClosed = true // 仅标记关闭,不立即日志
continue
}
v . UpdatedAt = gtime . Now ( )
updateRes , err2 := db . Where ( do . GameAct {
Uid : v . Uid ,
ActId : v . ActId ,
} ) . Data ( v ) . Update ( )
if err2 != nil {
g . Log ( ) . Error ( ctx , err2 )
continue
}
if row , _ := updateRes . RowsAffected ( ) ; row == 0 {
//g.Log().Error(ctx, "本次更新为0, 更新数据失败: %v", v)
continue
}
//删除缓存
s . DelCacheKey ( ctx , v . ActId , v . Uid )
updateCount ++
case <- ctx . Done ( ) :
g . Log ( ) . Debug ( ctx , "act协程被上下文取消" )
return
}
}
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
g . Log ( ) . Debugf ( ctx , "act当前写入数据库: %v 条" , addCount )
g . Log ( ) . Debugf ( ctx , "act当前更新数据库: %v 条" , updateCount )
return
}