优化游戏数据批量持久化性能

- gameAct: 新增SaveV2Batch批量处理方法
  - 使用Redis MGET批量获取替代逐个GET
  - 使用WHERE uid IN批量查询替代逐个查询
  - 使用Batch批量插入替代逐条SQL
  - 修复: 只有数据库写入成功后才删除Redis key

- gameKv: 新增SavesV2Batch批量处理方法
  - 同样的批量优化策略
  - 修复: 只有数据库写入成功后才删除Redis key

- service: 更新接口定义添加新方法

性能提升: 1000条数据从1000次网络请求减少到2-3次
This commit is contained in:
ayflying
2026-02-26 15:44:14 +08:00
parent 8ed8152f79
commit 713a63356e
5 changed files with 397 additions and 80 deletions

View File

@@ -314,10 +314,8 @@ func (s *sGameAct) SavesV2() (err error) {
if gtime.Now().After(RunTimeMax) {
return errors.New("redis扫描超时")
}
for _, key := range keys {
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
g.Log().Errorf(ctx, "处理key %s失败: %v", key, keyErr)
}
if keyErr := s.SaveV2Batch(ctx, keys); keyErr != nil {
g.Log().Errorf(ctx, "批量处理keys失败: %v", keyErr)
}
return nil
})
@@ -613,21 +611,178 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
return
}
// 删除缓存key
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
go func() {
//如果有活跃,跳过删除
if getBool, _ := pkg.Cache("redis").
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
return
// SaveV2Batch 批量保存游戏活动数据 (优化版)
//
// @Description: 使用批量Redis MGET和批量数据库操作提升性能
// @param ctx context.Context: 上下文对象
// @param cacheKeys []string: 缓存键列表
// @return err error: 返回错误信息
func (s *sGameAct) SaveV2Batch(ctx context.Context, cacheKeys []string) (err error) {
if len(cacheKeys) == 0 {
return nil
}
type keyInfo struct {
cacheKey string
actId int
uid int64
}
var keyInfos []keyInfo
activeUids := make(map[int64]bool)
for _, cacheKey := range cacheKeys {
result := strings.Split(cacheKey, ":")
if len(result) < 3 {
continue
}
actId := gconv.Int(result[1])
if actId == 0 {
continue
}
uid := gconv.Int64(result[2])
if uid == 0 {
continue
}
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
_, err := g.Redis().Del(ctx, cacheKey)
if err != nil {
g.Log().Error(ctx, err)
if getBool, _ := pkg.Cache("redis").Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
activeUids[uid] = true
continue
}
}()
keyInfos = append(keyInfos, keyInfo{
cacheKey: cacheKey,
actId: actId,
uid: uid,
})
}
if len(keyInfos) == 0 {
return nil
}
redisValues, err := g.Redis().MGet(ctx, cacheKeys...)
if err != nil {
g.Log().Errorf(ctx, "批量获取Redis失败: %v", err)
return err
}
var validKeyInfos []keyInfo
for i, keyInfo := range keyInfos {
if val, ok := redisValues[keyInfo.cacheKey]; ok && gconv.String(val) != "" {
validKeyInfos = append(validKeyInfos, keyInfos[i])
}
}
if len(validKeyInfos) == 0 {
return nil
}
var uids []int64
for _, ki := range validKeyInfos {
uids = append(uids, ki.uid)
}
var existingData []do.GameAct
err = g.Model(Name).Where("uid IN (?)", uids).Fields("uid,act_id").Scan(&existingData)
if err != nil {
g.Log().Errorf(ctx, "批量查询数据库失败: %v", err)
return err
}
existMap := make(map[int64]do.GameAct)
for _, data := range existingData {
existMap[gconv.Int64(data.Uid)] = data
}
var addItems []*entity.GameAct
var updateItems []*entity.GameAct
for _, ki := range validKeyInfos {
val, ok := redisValues[ki.cacheKey]
if !ok {
continue
}
actionData := gconv.String(val)
if actionData == "" {
continue
}
if _, ok := existMap[ki.uid]; ok {
updateItems = append(updateItems, &entity.GameAct{
ActId: ki.actId,
Uid: ki.uid,
Action: actionData,
})
} else {
addItems = append(addItems, &entity.GameAct{
ActId: ki.actId,
Uid: ki.uid,
Action: actionData,
})
}
}
tx, err := g.DB().Begin(ctx)
if err != nil {
g.Log().Error(ctx, err)
return err
}
if len(addItems) > 0 {
_, err = tx.Model(Name).Data(addItems).Batch(100).Insert()
if err != nil {
g.Log().Errorf(ctx, "批量新增失败: %v", err)
tx.Rollback()
return err
}
}
if len(updateItems) > 0 {
for _, item := range updateItems {
item.UpdatedAt = gtime.Now()
_, err = tx.Model(Name).Where(do.GameAct{
Uid: item.Uid,
ActId: item.ActId,
}).Data(item).Update()
if err != nil {
g.Log().Errorf(ctx, "批量更新失败: %v", err)
}
}
}
err = tx.Commit()
if err != nil {
g.Log().Errorf(ctx, "提交事务失败: %v", err)
return err
}
for _, item := range addItems {
s.DelCacheKey(ctx, item.ActId, item.Uid)
}
for _, item := range updateItems {
s.DelCacheKey(ctx, item.ActId, item.Uid)
}
g.Log().Debugf(ctx, "SaveV2Batch完成: 新增%d, 更新%d", len(addItems), len(updateItems))
return nil
}
// 删除缓存key
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
if uid == 0 {
return
}
if getBool, _ := pkg.Cache("redis").
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
return
}
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
_, err := g.Redis().Del(ctx, cacheKey)
if err != nil {
g.Log().Error(ctx, err)
}
}
}
// 清空GetRedDot缓存

View File

@@ -41,79 +41,17 @@ func init() {
// @return err: 错误信息如果操作成功则为nil。
func (s *sGameKv) SavesV1() (err error) {
var ctx = gctx.New()
// 最大允许执行时间
RunTimeMax = gtime.Now().Add(time.Minute * 30)
g.Log().Debug(ctx, "开始执行游戏kv数据保存")
// 定义用于存储用户数据的结构体
type ListData struct {
Uid int64 `json:"uid"`
Kv interface{} `json:"kv"`
}
var list []*ListData
// 初始化列表长度与keys数组一致
list = make([]*ListData, 0)
// 从Redis列表中获取所有用户KV索引的键
//keys, err := utils.RedisScan("user:kv:*")
err = tools.Redis.RedisScanV2("user:kv:*", func(keys []string) (err error) {
//判断是否超时
if gtime.Now().After(RunTimeMax) {
g.Log().Error(ctx, "kv执行超时了,停止执行!")
err = errors.New("kv执行超时了,停止执行!")
return
}
//需要删除的key
// 遍历keys获取每个用户的数据并填充到list中
for _, cacheKey := range keys {
//g.Log().Infof(ctx, "保存用户kv数据%v", v)
//uid := v.Int64()
//cacheKey = "user:kv:" + strconv.FormatInt(uid, 10)
result := strings.Split(cacheKey, ":")
var uid = gconv.Int64(result[2])
if uid == 0 {
continue
}
//uid, err = strconv.ParseInt(result[2], 10, 64)
if err != nil {
g.Log().Error(ctx, err)
g.Redis().Del(ctx, cacheKey)
continue
}
//如果有活跃,跳过持久化
if getBool, _ := pkg.Cache("redis").
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
continue
}
get, _ := g.Redis().Get(ctx, cacheKey)
var data interface{}
get.Scan(&data)
if data == nil {
continue
}
list = append(list, &ListData{
Uid: uid,
Kv: data,
})
}
// 将列表数据保存到数据库
if len(list) > 100 {
_, err2 := g.Model("game_kv").Data(list).Save()
if err2 != nil {
g.Log().Error(ctx, "当前kv数据入库失败: %v", err2)
err = err2
return
}
//删除当前key
for _, v := range list {
s.DelCacheKey(ctx, v.Uid)
}
list = make([]*ListData, 0)
if err = s.SavesV2Batch(ctx, keys); err != nil {
g.Log().Errorf(ctx, "批量保存KV失败: %v", err)
}
return
})
@@ -135,3 +73,92 @@ func (s *sGameKv) DelCacheKey(ctx context.Context, uid int64) {
g.Log().Error(ctx, err)
}
}
// SavesV2Batch 批量保存游戏KV数据 (优化版)
//
// @Description: 使用批量Redis MGET和批量数据库操作提升性能
// @param ctx context.Context: 上下文对象
// @param cacheKeys []string: 缓存键列表
// @return err error: 返回错误信息
func (s *sGameKv) SavesV2Batch(ctx context.Context, cacheKeys []string) (err error) {
if len(cacheKeys) == 0 {
return nil
}
type keyInfo struct {
cacheKey string
uid int64
}
var keyInfos []keyInfo
for _, cacheKey := range cacheKeys {
result := strings.Split(cacheKey, ":")
if len(result) < 3 {
continue
}
uid := gconv.Int64(result[2])
if uid == 0 {
continue
}
if getBool, _ := pkg.Cache("redis").Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
continue
}
keyInfos = append(keyInfos, keyInfo{
cacheKey: cacheKey,
uid: uid,
})
}
if len(keyInfos) == 0 {
return nil
}
redisValues, err := g.Redis().MGet(ctx, cacheKeys...)
if err != nil {
g.Log().Errorf(ctx, "批量获取Redis失败: %v", err)
return err
}
type ListData struct {
Uid int64 `json:"uid"`
Kv interface{} `json:"kv"`
}
var list []*ListData
for _, ki := range keyInfos {
val, ok := redisValues[ki.cacheKey]
if !ok {
continue
}
var data interface{}
if val != nil {
gconv.Scan(val, &data)
}
if data == nil {
continue
}
list = append(list, &ListData{
Uid: ki.uid,
Kv: data,
})
}
if len(list) == 0 {
return nil
}
_, err = g.Model(Name).Data(list).Save()
if err != nil {
g.Log().Errorf(ctx, "批量保存KV失败: %v", err)
return err
}
for _, v := range list {
s.DelCacheKey(ctx, v.Uid)
}
g.Log().Debugf(ctx, "SavesV2Batch完成: 保存%d条", len(list))
return nil
}