Compare commits

...

8 Commits

Author SHA1 Message Date
ayflying
4fd262beae 修改更新与写入的持计划日志 2025-12-02 12:21:15 +08:00
ayflying
239115ead8 Merge branch 'master' of https://gitea.adesk.com/public_project/utility_go 2025-12-02 10:40:07 +08:00
ayflying
02b3e275d0 暂时屏蔽缓存的删除,来测试持久化 2025-12-02 10:39:53 +08:00
goldensea
9810e55a15 修复safeProperty方法 2025-12-01 17:18:17 +08:00
ayflying
03e4ad5db5 优化持久化代码,提高持久化性能 2025-12-01 15:37:32 +08:00
ayflying
eb94e50f02 改为3天持久化 2025-11-24 11:20:18 +08:00
liaoyulong
26444905cf 新增DelByStopRank方法 调整SetScore 2025-11-21 10:29:46 +08:00
liaoyulong
d82b12ddaf Revert "解决后续data解析到空map为nil的问题"
This reverts commit f3e1ad74b5.
2025-11-19 14:56:12 +08:00
3 changed files with 138 additions and 45 deletions

View File

@@ -15,7 +15,6 @@ import (
service2 "github.com/ayflying/utility_go/service" service2 "github.com/ayflying/utility_go/service"
"github.com/ayflying/utility_go/tools" "github.com/ayflying/utility_go/tools"
"github.com/gogf/gf/v2/container/gset" "github.com/gogf/gf/v2/container/gset"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gctx"
@@ -27,6 +26,7 @@ var (
Name = "game_act" Name = "game_act"
ActList = gset.New(true) ActList = gset.New(true)
RunTimeMax *gtime.Time RunTimeMax *gtime.Time
TaskMax int64 = 5
) )
type sGameAct struct { type sGameAct struct {
@@ -69,8 +69,8 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
Uid: uid, Uid: uid,
ActId: actId, ActId: actId,
}).Fields("action").OrderDesc("updated_at").Value() }).Fields("action").OrderDesc("updated_at").Value()
// getDb.Scan(&data) getDb.Scan(&data)
data = gvar.New(getDb)
if data == nil || data.IsEmpty() { if data == nil || data.IsEmpty() {
return return
} }
@@ -78,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
_, err = g.Redis().Set(ctx, keyCache, data) _, err = g.Redis().Set(ctx, keyCache, data)
var CacheKey = fmt.Sprintf("act:update:%d", uid) var CacheKey = fmt.Sprintf("act:update:%d", uid)
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*1) pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3+time.Hour*12)
return return
} }
@@ -237,7 +237,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
} }
//删除缓存 //删除缓存
go s.DelCacheKey(ctx, v.ActId, v.Uid) s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++ updateCount++
update = make([]*entity.GameAct, 0) update = make([]*entity.GameAct, 0)
@@ -265,10 +265,10 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return return
} }
for _, v2 := range add { //for _, v2 := range add {
//删除缓存 // //删除缓存
go s.DelCacheKey(ctx, v2.ActId, v2.Uid) // s.DelCacheKey(ctx, v2.ActId, v2.Uid)
} //}
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count) //g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
add = make([]*entity.GameAct, 0) add = make([]*entity.GameAct, 0)
@@ -312,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
go func() { go func() {
scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error { scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error {
if gtime.Now().After(RunTimeMax) { if gtime.Now().After(RunTimeMax) {
return errors.New("Redis扫描超时") return errors.New("redis扫描超时")
} }
for _, key := range keys { for _, key := range keys {
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil { if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
@@ -422,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
// @param update []*entity.GameAct: 更新数据 // @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息 // @return err error: 返回错误信息
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) { func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
tx, err := g.DB().Begin(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
//批量写入数据库 //批量写入数据库
updateCount := 0 var updateCount int64
if len(update) > 0 { if len(update) > 0 {
for _, v := range update { for _, v := range update {
v.UpdatedAt = gtime.Now() v.UpdatedAt = gtime.Now()
updateRes, err2 := g.Model(Name).Where(do.GameAct{ updateRes, err2 := tx.Model(Name).Where(do.GameAct{
Uid: v.Uid, Uid: v.Uid,
ActId: v.ActId, ActId: v.ActId,
}).Data(v).Update() }).Data(v).Update()
@@ -439,18 +445,30 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v) g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue continue
} }
updateCount++
if updateCount > TaskMax {
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
return
}
updateCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存 //删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid) s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
} }
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) //循环结束了,最后写入一波
g.Log().Debugf(ctx, "Cache2Sql运行结束act当前更新数据库: %v 条", updateCount)
update = (update)[:0] update = (update)[:0]
} }
var addCount int64 var addCount int64
if len(add) > 0 { if len(add) > 0 {
for _, v := range add { for _, v := range add {
addRes, err2 := g.Model(Name).Data(v).Insert() addRes, err2 := tx.Model(Name).Data(v).Insert()
if err2 != nil { if err2 != nil {
g.Log().Error(ctx, err2) g.Log().Error(ctx, err2)
continue continue
@@ -460,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
continue continue
} }
addCount++ addCount++
if addCount > TaskMax {
g.Log().Debugf(ctx, "超过%v条act当前写入数据库: %v 条", TaskMax, addCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
return
}
addCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存 //删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid) s.DelCacheKey(ctx, v.ActId, v.Uid)
} }
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
//循环结束了,最后写入一波
g.Log().Debugf(ctx, "Cache2Sql运行结束act当前写入数据库: %v 条", addCount)
add = (add)[:0] add = (add)[:0]
} }
err = tx.Commit()
return return
} }
@@ -479,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed := false addClosed := false
updateClosed := false updateClosed := false
tx, err := g.DB().Begin(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
// 使用链式安全模式 // 使用链式安全模式
var db = g.Model(Name).Safe() //var db = tx.Model(Name).Safe()
for { for {
//检查是否两个通道都已关闭且为空 //检查是否两个通道都已关闭且为空
@@ -494,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed = true // 仅标记关闭,不立即日志 addClosed = true // 仅标记关闭,不立即日志
continue continue
} }
addRes, err2 := db.Data(v).Insert() addRes, err2 := tx.Model(Name).Data(v).Insert()
if err2 != nil { if err2 != nil {
g.Log().Error(ctx, err2) g.Log().Error(ctx, err2)
continue continue
@@ -505,6 +543,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
} }
row, _ := addRes.RowsAffected() row, _ := addRes.RowsAffected()
addCount += row addCount += row
if addCount > TaskMax {
g.Log().Debugf(ctx, "超过%v条act当前写入数据库: %v 条", TaskMax, addCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
return
}
addCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存 //删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid) s.DelCacheKey(ctx, v.ActId, v.Uid)
@@ -514,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
continue continue
} }
v.UpdatedAt = gtime.Now() v.UpdatedAt = gtime.Now()
updateRes, err2 := db.Where(do.GameAct{ updateRes, err2 := tx.Model(Name).Where(do.GameAct{
Uid: v.Uid, Uid: v.Uid,
ActId: v.ActId, ActId: v.ActId,
}).Data(v).Update() }).Data(v).Update()
@@ -526,9 +576,21 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
//g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v) //g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue continue
} }
updateCount++
if updateCount > TaskMax {
g.Log().Debugf(ctx, "超过%v条act当前更新数据库: %v 条", TaskMax, updateCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
return
}
updateCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存 //删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid) s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
case <-ctx.Done(): case <-ctx.Done():
g.Log().Debug(ctx, "act协程被上下文取消") g.Log().Debug(ctx, "act协程被上下文取消")
@@ -536,25 +598,28 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
} }
} }
err = tx.Commit()
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志) // 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount) g.Log().Debugf(ctx, "运行结束act当前写入数据库: %v 条", addCount)
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount) g.Log().Debugf(ctx, "运行结束act当前更新数据库: %v 条", updateCount)
return return
} }
// 删除缓存key // 删除缓存key
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) { func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
//如果有活跃,跳过删除 go func() {
if getBool, _ := pkg.Cache("redis"). //如果有活跃,跳过删除
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool { if getBool, _ := pkg.Cache("redis").
return Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
} return
}
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid) cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
_, err := g.Redis().Del(ctx, cacheKey) _, err := g.Redis().Del(ctx, cacheKey)
if err != nil { if err != nil {
g.Log().Error(ctx, err) g.Log().Error(ctx, err)
} }
}()
} }
// 清空GetRedDot缓存 // 清空GetRedDot缓存

View File

@@ -100,9 +100,6 @@ func safeProperty(property map[string]any) {
if _, ok := v.(string); ok { if _, ok := v.(string); ok {
property[k] = onlyWordRE.ReplaceAllString(gconv.String(v), "*") property[k] = onlyWordRE.ReplaceAllString(gconv.String(v), "*")
} }
} else {
property[k] = safePropertyRE.ReplaceAllString(gconv.String(v), "*")
} }
} }
for _, delkey := range delkeys { for _, delkey := range delkeys {

View File

@@ -117,9 +117,17 @@ func (r *F64CountRank) IncrScore(id int64, score int64) (curScore float64, err e
// @param score // @param score
// @return err // @return err
func (r *F64CountRank) SetScore(id int64, score int) (err error) { func (r *F64CountRank) SetScore(id int64, score int) (err error) {
//如果分数小于0则删除
if score <= 0 {
err = r.DelScore(id)
if err != nil {
return
}
return
}
// 记录当前时间戳,用于更新成员的最新活动时间。 // 记录当前时间戳,用于更新成员的最新活动时间。
now := time.Now().UnixMilli() now := time.Now().UnixMilli()
// 将成员的更新时间戳加入到Redis的有序集合中确保成员的排序依据是最新的活动时间。 // 将成员的更新时间戳加入到Redis的有序集合中确保成员的排序依据是最新的活动时间。
_, err = g.Redis().ZAdd(ctx, r.updateTs, &gredis.ZAddOption{}, gredis.ZAddMember{ _, err = g.Redis().ZAdd(ctx, r.updateTs, &gredis.ZAddOption{}, gredis.ZAddMember{
Score: float64(now), Score: float64(now),
@@ -128,16 +136,10 @@ func (r *F64CountRank) SetScore(id int64, score int) (err error) {
if err != nil { if err != nil {
return return
} }
//如果分数小于0则删除
if score <= 0 { // 覆盖成员的分数
err = r.DelScore(id)
if err != nil {
return
}
}
// 增加成员的分数,并返回增加后的当前分数。
_, err = g.Redis().ZAdd(ctx, r.name, &gredis.ZAddOption{}, gredis.ZAddMember{ _, err = g.Redis().ZAdd(ctx, r.name, &gredis.ZAddOption{}, gredis.ZAddMember{
Score: float64(score) + (3*1e13 - float64(now)) / 1e14, Score: float64(score) + (3*1e13-float64(now))/1e14,
Member: id, Member: id,
}) })
@@ -187,6 +189,35 @@ func (r *F64CountRank) DelScore(id int64) (err error) {
return return
} }
// DelByStopRank 删除指定名次后的元素
func (r *F64CountRank) DelByStopRank(stop int64) (err error) {
// 初始化一个空的int64切片用于存储指定排名范围内的元素。
var members []int64
// 使用Redis的ZRange命令获取指定排名范围内的元素。
// 选项Rev设置为true表示按照分数从高到低的顺序返回元素。
get, err := g.Redis().ZRange(ctx, r.name, stop, 9999999,
gredis.ZRangeOption{
Rev: true,
})
// 使用Scan方法将获取到的元素扫描到members切片中。
err = get.Scan(&members)
// 如果扫描过程中出现错误,直接返回错误。
if err != nil {
return
}
// 遍历members切片对于每个元素使用ZRem命令从更新时间集合中删除对应的成员。
for _, member := range members {
_, err = g.Redis().ZRem(ctx, r.updateTs, member)
// 忽略ZRem操作的错误因为即使元素不存在ZRem也不会返回错误。
}
//删除超过9999的数据
g.Redis().ZRemRangeByRank(ctx, r.name, 0, -(stop + 1))
return
}
// DelByRank 根据排名范围删除元素。 // DelByRank 根据排名范围删除元素。
// 该方法使用了Redis的有序集合数据结构通过ZRange和ZRemRangeByRank命令来实现。 // 该方法使用了Redis的有序集合数据结构通过ZRange和ZRemRangeByRank命令来实现。
// 参数start和stop定义了要删除的排名范围从start到stop包括start和stop // 参数start和stop定义了要删除的排名范围从start到stop包括start和stop