@@ -306,7 +306,8 @@ func (s *sGameAct) SavesV2() (err error) {
go func ( ) {
defer wg . Done ( ) // Cache2SqlChan协程完成后减1
s . Cache2SqlChan ( ctx , addChan , updateChan )
s . Cache2SqlChanOptimized ( ctx , addChan , updateChan )
// s.Cache2SqlChan(ctx, addChan, updateChan)
} ( )
go func ( ) {
@@ -314,7 +315,12 @@ func (s *sGameAct) SavesV2() (err error) {
if gtime . Now ( ) . After ( RunTimeMax ) {
return errors . New ( "redis扫描超时" )
}
if keyErr := s . SaveV2Batch ( ctx , keys ) ; keyErr != nil {
// for _, v := range keys {
// if keyErr := s.SaveV2(ctx, v, addChan, updateChan); keyErr != nil {
// g.Log().Errorf(ctx, "处理key %v 失败: %v", v, keyErr)
// }
// }
if keyErr := s . SaveV2Batch ( ctx , keys , addChan , updateChan ) ; keyErr != nil {
g . Log ( ) . Errorf ( ctx , "批量处理keys失败: %v" , keyErr )
}
return nil
@@ -338,6 +344,58 @@ func (s *sGameAct) SavesV2() (err error) {
return
}
// SavesV3 保存游戏活动数据
//
// @Description: 保存游戏活动数据, 里面的超时时间时50分钟
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
// SavesV3 保存游戏活动数据
func ( s * sGameAct ) SavesV3 ( ) ( err error ) {
var ctx = gctx . New ( )
g . Log ( ) . Debug ( ctx , "开始执行游戏act数据保存了" )
RunTimeMax = gtime . Now ( ) . Add ( time . Minute * 50 )
dataChan := make ( chan * entity . GameAct , 1000 )
errChan := make ( chan error , 1 )
var wg sync . WaitGroup
wg . Add ( 1 ) // 仅需添加1次( 对应Cache2SqlChan协程)
// wg.Add(1) // 移除多余的Add调用, 避免计数不平衡
go func ( ) {
defer wg . Done ( ) // Cache2SqlChan协程完成后减1
s . Cache2SqlChanAll ( ctx , dataChan )
// s.Cache2SqlChan(ctx, addChan, updateChan)
} ( )
go func ( ) {
scanErr := tools . Redis . RedisScanV2 ( "act:*" , func ( keys [ ] string ) error {
if gtime . Now ( ) . After ( RunTimeMax ) {
return errors . New ( "redis扫描超时" )
}
if keyErr := s . SaveV3Batch ( ctx , keys , dataChan ) ; keyErr != nil {
g . Log ( ) . Errorf ( ctx , "批量处理keys失败: %v" , keyErr )
}
return nil
} )
close ( dataChan )
errChan <- scanErr
} ( )
// 等待扫描和处理完成,同时监听上下文取消
select {
case scanErr := <- errChan :
wg . Wait ( ) // 等待Cache2SqlChan处理完剩余数据
if scanErr != nil {
return gerror . New ( fmt . Sprintf ( "Redis扫描失败: %v" , scanErr ) )
}
case <- ctx . Done ( ) :
wg . Wait ( )
return ctx . Err ( ) // 返回上下文取消原因
}
return
}
// SaveV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
@@ -617,7 +675,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
// @param ctx context.Context: 上下文对象
// @param cacheKeys []string: 缓存键列表
// @return err error: 返回错误信息
func ( s * sGameAct ) SaveV2Batch ( ctx context . Context , cacheKeys [ ] string ) ( err error ) {
func ( s * sGameAct ) SaveV2Batch ( ctx context . Context , cacheKeys [ ] string , addChan , updateChan chan * entity . GameAct ) ( err error ) {
if len ( cacheKeys ) == 0 {
return nil
}
@@ -631,6 +689,7 @@ func (s *sGameAct) SaveV2Batch(ctx context.Context, cacheKeys []string) (err err
var keyInfos [ ] keyInfo
activeUids := make ( map [ int64 ] bool )
//提取出不活跃的信息
for _ , cacheKey := range cacheKeys {
result := strings . Split ( cacheKey , ":" )
if len ( result ) < 3 {
@@ -680,7 +739,9 @@ func (s *sGameAct) SaveV2Batch(ctx context.Context, cacheKeys []string) (err err
var uids [ ] int64
for _ , ki := range validKeyInfos {
uids = append ( uids , ki . uid )
if ! tools . InArray [ int64 ] ( uids , ki . uid ) {
uids = append ( uids , ki . uid )
}
}
var existingData [ ] do . GameAct
@@ -690,14 +751,11 @@ func (s *sGameAct) SaveV2Batch(ctx context.Context, cacheKeys []string) (err err
return err
}
existMap := make ( map [ int64 ] do . GameAct )
existMap := make ( map [ string ] do . GameAct )
for _ , data := range existingData {
existMap [ gconv . Int64 ( data . Uid ) ] = data
existMap [ fmt . Sprintf ( "act:%v:%v" , data . ActId , data . Uid ) ] = data
}
var addItems [ ] * entity . GameAct
var updateItems [ ] * entity . GameAct
for _ , ki := range validKeyInfos {
val , ok := redisValues [ ki . cacheKey ]
if ! ok {
@@ -707,63 +765,164 @@ func (s *sGameAct) SaveV2Batch(ctx context.Context, cacheKeys []string) (err err
if actionData == "" {
continue
}
if _ , ok := existMap [ ki . uid ] ; ok {
updateItems = append ( updateItems , & entity . GameAct {
if _ , ok := existMap [ fmt . Sprintf ( "act:%d:%d" , ki . actId , ki. uid ) ]; ok {
updateChan <- & entity . GameAct {
ActId : ki . actId ,
Uid : ki . uid ,
Action : actionData ,
} )
}
} else {
addItems = append ( addItems , & entity . GameAct {
addChan <- & entity . GameAct {
ActId : ki . actId ,
Uid : ki . uid ,
Action : actionData ,
} )
}
}
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Error ( ctx , err )
return err
}
if len ( addItems ) > 0 {
_ , err = tx . Model ( Name ) . Data ( addItems ) . Batch ( 100 ) . Insert ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "批量新增失败: %v" , err )
tx . Rollback ( )
return err
}
}
if len ( updateItems ) > 0 {
for _ , item := range updateItems {
item . UpdatedAt = gtime . Now ( )
_ , err = tx . Model ( Name ) . Where ( do . GameAct {
Uid : item . Uid ,
ActId : item . ActId ,
} ) . Data ( item ) . Update ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "批量更新失败: %v" , err )
}
}
}
err = tx . Commit ( )
// tx, err := g.DB().Begin(ctx )
// if err != nil {
// g.Log().Error(ctx, err)
// return err
// }
// if len(addItems) > 0 {
// _, err = tx.Model(Name).Data(addItems).Batch(100).Insert()
// if err != nil {
// g.Log().Errorf(ctx, "批量新增失败: %v", err)
// tx.Rollback()
// return err
// }
// }
// if len(updateItems) > 0 {
// for _, item := range updateItems {
// item.UpdatedAt = gtime.Now()
// _, err = tx.Model(Name).Where(do.GameAct{
// Uid: item.Uid,
// ActId: item.ActId,
// }).Data(item).Update()
// if err != nil {
// g.Log().Errorf(ctx, "批量更新失败: %v", err)
// }
// }
// }
// err = tx.Commit()
// if err != nil {
// g.Log().Errorf(ctx, "提交事务失败: %v", err)
// return err
// }
// for _, item := range addItems {
// s.DelCacheKey(ctx, item.ActId, item.Uid)
// }
// for _, item := range updateItems {
// s.DelCacheKey(ctx, item.ActId, item.Uid)
// }
// g.Log().Debugf(ctx, "SaveV2Batch完成: 新增%d, 更新%d", len(addItems), len(updateItems))
return nil
}
// SaveV2Batch 批量保存游戏活动数据 (优化版)
//
// @Description: 使用批量Redis MGET和批量数据库操作提升性能
// @param ctx context.Context: 上下文对象
// @param cacheKeys []string: 缓存键列表
// @return err error: 返回错误信息
func ( s * sGameAct ) SaveV3Batch ( ctx context . Context , cacheKeys [ ] string , dataChan chan * entity . GameAct ) ( err error ) {
if len ( cacheKeys ) == 0 {
return nil
}
type keyInfo struct {
cacheKey string
actId int
uid int64
}
var keyInfos [ ] keyInfo
activeUids := make ( map [ int64 ] bool )
//提取出不活跃的信息
for _ , cacheKey := range cacheKeys {
result := strings . Split ( cacheKey , ":" )
if len ( result ) < 3 {
continue
}
actId := gconv . Int ( result [ 1 ] )
if actId == 0 {
continue
}
uid := gconv . Int64 ( result [ 2 ] )
if uid == 0 {
continue
}
if getBool , _ := pkg . Cache ( "redis" ) . Contains ( ctx , fmt . Sprintf ( "act:update:%d" , uid ) ) ; getBool {
activeUids [ uid ] = true
continue
}
keyInfos = append ( keyInfos , keyInfo {
cacheKey : cacheKey ,
actId : actId ,
uid : uid ,
} )
}
if len ( keyInfos ) == 0 {
return nil
}
redisValues , err := g . Redis ( ) . MGet ( ctx , cacheKeys ... )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务 失败: %v" , err )
g . Log ( ) . Errorf ( ctx , "批量获取Redis 失败: %v" , err )
return err
}
fo r _ , item := range addItems {
s . DelCacheKey ( ctx , item . ActId , item . Uid )
}
for _ , item := range updateItems {
s . DelCacheKey ( ctx , item . ActId , item . Uid )
va r validKeyInfos [ ] keyInfo
for i , keyInfo := range keyInfos {
if val , ok := redisValues [ keyInfo . cacheKey ] ; ok && gconv . String ( val ) != "" {
validKeyIn fos = append ( validKeyInfos , keyInfos [ i ] )
}
}
g . Log ( ) . Debugf ( ctx , "SaveV2Batch完成: 新增%d, 更新%d" , len ( addItems ) , len ( updateItems ) )
if len ( validKeyInfos ) == 0 {
return nil
}
var uids [ ] int64
for _ , ki := range validKeyInfos {
if ! tools . InArray [ int64 ] ( uids , ki . uid ) {
uids = append ( uids , ki . uid )
}
}
var existingData [ ] do . GameAct
err = g . Model ( Name ) . Where ( "uid IN (?)" , uids ) . Fields ( "uid,act_id" ) . Scan ( & existingData )
if err != nil {
g . Log ( ) . Errorf ( ctx , "批量查询数据库失败: %v" , err )
return err
}
for _ , ki := range validKeyInfos {
val , ok := redisValues [ ki . cacheKey ]
if ! ok {
continue
}
actionData := gconv . String ( val )
if actionData == "" {
continue
}
dataChan <- & entity . GameAct {
ActId : ki . actId ,
Uid : ki . uid ,
Action : actionData ,
}
}
return nil
}
@@ -783,7 +942,6 @@ func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
g . Log ( ) . Error ( ctx , err )
}
}
}
// 清空GetRedDot缓存
func ( s * sGameAct ) RefreshGetRedDotCache ( uid int64 ) {
@@ -814,3 +972,261 @@ func (s *sGameAct) Del(uid int64, actId int) {
} ) . Delete ( )
}
func ( s * sGameAct ) Cache2SqlChanOptimized ( ctx context . Context , addChan , updateChan chan * entity . GameAct ) {
const batchSize = 200
const commitThreshold = 1000 // 每 5000 条提交一次事务
var addBatch [ ] * entity . GameAct
// var addDelBatch []*entity.GameAct
// var updateDelBatch []*entity.GameAct
var addAllCount , updateAllCount int64
var txAddCount , txUpdateCount int64 // 当前事务累计计数
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Errorf ( ctx , "开启事务失败:%v" , err )
return
}
// 刷新新增数据
flushAdd := func ( ) {
if len ( addBatch ) == 0 {
return
}
addRes , err := tx . Model ( Name ) . Data ( addBatch ) . Batch ( 50 ) . Insert ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "批量新增失败:%v" , err )
return
} else {
row , _ := addRes . RowsAffected ( )
if row != int64 ( len ( addBatch ) ) {
//g.Log().Error(ctx, "本次未能全部新增,新增数据失败: %v", v)
return
}
txAddCount += row
}
for _ , v := range addBatch {
s . DelCacheKey ( ctx , v . ActId , v . Uid )
// addDelBatch = append(addDelBatch, addBatch...)
}
// for _, v := range addBatch {
// v.UpdatedAt = gtime.Now()
// addRes, err := tx.Model(Name).Data(v).Insert()
// if err != nil {
// g.Log().Errorf(ctx, "新增失败:%v", err)
// continue
// } else {
// row, _ := addRes.RowsAffected()
// if row == 0 {
// //g.Log().Error(ctx, "本次修改,数据失败: %v", v)
// continue
// }
// s.DelCacheKey(ctx, v.ActId, v.Uid)
// txAddCount += row
// addDelBatch = append(addDelBatch, v)
// }
// }
// 达到阈值提交事务
if txAddCount + txUpdateCount >= commitThreshold {
err := tx . Commit ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务失败:%v" , err )
return
}
addAllCount += txAddCount
updateAllCount += txUpdateCount
txUpdateCount = 0
txAddCount = 0
tx , _ = g . DB ( ) . Begin ( ctx )
}
addBatch = addBatch [ : 0 ]
}
// 刷新更新数据
flushUpdate := func ( data * entity . GameAct ) {
data . UpdatedAt = gtime . Now ( )
updateRes , err := tx . Model ( Name ) . Where ( do . GameAct {
Uid : data . Uid ,
ActId : data . ActId ,
} ) . Data ( data ) . Update ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "更新失败:%v" , err )
return
} else {
row , _ := updateRes . RowsAffected ( )
if row == 0 {
//g.Log().Error(ctx, "本次修改,数据失败: %v", v)
return
}
s . DelCacheKey ( ctx , data . ActId , data . Uid )
txUpdateCount += row
// updateDelBatch = append(updateDelBatch, data)
}
// 达到阈值提交事务
if txUpdateCount + txAddCount >= commitThreshold {
err := tx . Commit ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务失败:%v" , err )
return
}
addAllCount += txAddCount
updateAllCount += txUpdateCount
txUpdateCount = 0
txAddCount = 0
tx , _ = g . DB ( ) . Begin ( ctx )
}
}
addClosed := false
updateClosed := false
for {
if addClosed && updateClosed {
break
}
select {
case v , ok := <- addChan :
if ! ok {
addClosed = true
continue
}
addBatch = append ( addBatch , v )
if len ( addBatch ) >= batchSize {
flushAdd ( )
}
case v , ok := <- updateChan :
if ! ok {
updateClosed = true
continue
}
flushUpdate ( v )
case <- ctx . Done ( ) :
g . Log ( ) . Debug ( ctx , "act协程被上下文取消" )
return
}
}
// 处理剩余数据
flushAdd ( )
// 提交最后的事务
// if txAddCount+txUpdateCount > 0 {
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务失败:%v" , err )
return
} else {
// for _, v := range addDelBatch {
// s.DelCacheKey(ctx, v.ActId, v.Uid)
// }
// for _, v := range updateDelBatch {
// s.DelCacheKey(ctx, v.ActId, v.Uid)
// }
addAllCount += txAddCount
updateAllCount += txUpdateCount
}
g . Log ( ) . Debugf ( ctx , "运行结束act当前写入数据库: %v 条" , addAllCount )
g . Log ( ) . Debugf ( ctx , "运行结束act当前更新数据库: %v 条" , updateAllCount )
}
func ( s * sGameAct ) Cache2SqlChanAll ( ctx context . Context , dataChan chan * entity . GameAct ) {
const commitThreshold = 5000 // 每 5000 条提交一次事务
var addAllCount , updateAllCount int64
var txAddCount , txUpdateCount int64 // 当前事务累计计数
tx , err := g . DB ( ) . Begin ( ctx )
if err != nil {
g . Log ( ) . Errorf ( ctx , "开启事务失败:%v" , err )
return
}
// 刷新新增数据
flush := func ( data * entity . GameAct ) {
dataRes , err := tx . Model ( Name ) . Data ( data ) . Save ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "保存失败:%v" , err )
return
} else {
row , _ := dataRes . RowsAffected ( )
switch row {
case 0 :
g . Log ( ) . Error ( ctx , "本次未能保存,保存数据失败" )
return
case 1 :
//新增
txAddCount ++
case 2 :
//更新
txUpdateCount ++
default :
g . Log ( ) . Errorf ( ctx , "执行行数不对: %v" , row )
}
s . DelCacheKey ( ctx , data . ActId , data . Uid )
}
// 达到阈值提交事务
if txAddCount + txUpdateCount >= commitThreshold {
err := tx . Commit ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务失败:%v" , err )
return
}
addAllCount += txAddCount
updateAllCount += txUpdateCount
txAddCount = 0
txUpdateCount = 0
tx , _ = g . DB ( ) . Begin ( ctx )
}
}
dataClosed := false
for {
if dataClosed {
break
}
select {
case v , ok := <- dataChan :
if ! ok {
dataClosed = true
continue
}
flush ( v )
case <- ctx . Done ( ) :
g . Log ( ) . Debug ( ctx , "act协程被上下文取消" )
return
}
}
// 提交最后的事务
// if txAddCount+txUpdateCount > 0 {
err = tx . Commit ( )
if err != nil {
g . Log ( ) . Errorf ( ctx , "提交事务失败:%v" , err )
return
} else {
// for _, v := range addDelBatch {
// s.DelCacheKey(ctx, v.ActId, v.Uid)
// }
// for _, v := range updateDelBatch {
// s.DelCacheKey(ctx, v.ActId, v.Uid)
// }
addAllCount += txAddCount
updateAllCount += txUpdateCount
}
g . Log ( ) . Debugf ( ctx , "运行结束act当前写入数据库: %v 条" , addAllCount )
g . Log ( ) . Debugf ( ctx , "运行结束act当前更新数据库: %v 条" , updateAllCount )
}