Compare commits

...

12 Commits

Author SHA1 Message Date
liaoyulong
9fb0513703 act持久化事务最大提交从5改为100 2025-12-05 11:25:18 +08:00
liaoyulong
7b96919d80 最后一次的count放到计数中 2025-12-04 12:23:36 +08:00
goldensea
d6c3f542f3 Merge branch 'master' of https://gitea.adesk.com/public_project/utility_go 2025-12-03 19:40:06 +08:00
goldensea
1efac70cdb 压测脚本 2025-12-03 19:38:43 +08:00
liaoyulong
01c97c37f7 注释act持久化的冗余打印 最后执行结束后打印最终计数 2025-12-03 16:38:38 +08:00
ayflying
4fd262beae 修改更新与写入的持计划日志 2025-12-02 12:21:15 +08:00
ayflying
239115ead8 Merge branch 'master' of https://gitea.adesk.com/public_project/utility_go 2025-12-02 10:40:07 +08:00
ayflying
02b3e275d0 暂时屏蔽缓存的删除,来测试持久化 2025-12-02 10:39:53 +08:00
goldensea
9810e55a15 修复safeProperty方法 2025-12-01 17:18:17 +08:00
ayflying
03e4ad5db5 优化持久化代码,提高持久化性能 2025-12-01 15:37:32 +08:00
ayflying
eb94e50f02 改为3天持久化 2025-11-24 11:20:18 +08:00
liaoyulong
26444905cf 新增DelByStopRank方法 调整SetScore 2025-11-21 10:29:46 +08:00
4 changed files with 196 additions and 43 deletions

View File

@@ -26,6 +26,7 @@ var (
Name = "game_act"
ActList = gset.New(true)
RunTimeMax *gtime.Time
TaskMax int64 = 100
)
type sGameAct struct {
@@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
_, err = g.Redis().Set(ctx, keyCache, data)
var CacheKey = fmt.Sprintf("act:update:%d", uid)
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*1)
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3+time.Hour*12)
return
}
@@ -236,7 +237,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
}
//删除缓存
go s.DelCacheKey(ctx, v.ActId, v.Uid)
s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
update = make([]*entity.GameAct, 0)
@@ -264,10 +265,10 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return
}
for _, v2 := range add {
//删除缓存
go s.DelCacheKey(ctx, v2.ActId, v2.Uid)
}
//for _, v2 := range add {
// //删除缓存
// s.DelCacheKey(ctx, v2.ActId, v2.Uid)
//}
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
add = make([]*entity.GameAct, 0)
@@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
go func() {
scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error {
if gtime.Now().After(RunTimeMax) {
return errors.New("Redis扫描超时")
return errors.New("redis扫描超时")
}
for _, key := range keys {
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
@@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
tx, err := g.DB().Begin(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
//批量写入数据库
updateCount := 0
var updateCount int64
if len(update) > 0 {
for _, v := range update {
v.UpdatedAt = gtime.Now()
updateRes, err2 := g.Model(Name).Where(do.GameAct{
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
Uid: v.Uid,
ActId: v.ActId,
}).Data(v).Update()
@@ -438,18 +445,30 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue
}
updateCount++
if updateCount > TaskMax {
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
return
}
updateCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
}
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
//循环结束了,最后写入一波
g.Log().Debugf(ctx, "Cache2Sql运行结束act当前更新数据库: %v 条", updateCount)
update = (update)[:0]
}
var addCount int64
if len(add) > 0 {
for _, v := range add {
addRes, err2 := g.Model(Name).Data(v).Insert()
addRes, err2 := tx.Model(Name).Data(v).Insert()
if err2 != nil {
g.Log().Error(ctx, err2)
continue
@@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
continue
}
addCount++
if addCount > TaskMax {
g.Log().Debugf(ctx, "超过%v条act当前写入数据库: %v 条", TaskMax, addCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
return
}
addCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid)
}
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
//循环结束了,最后写入一波
g.Log().Debugf(ctx, "Cache2Sql运行结束act当前写入数据库: %v 条", addCount)
add = (add)[:0]
}
err = tx.Commit()
return
}
@@ -477,9 +510,17 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
//通道关闭标志
addClosed := false
updateClosed := false
//写入总数
var addAllCount, updateAllCount int64
tx, err := g.DB().Begin(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
// 使用链式安全模式
var db = g.Model(Name).Safe()
//var db = tx.Model(Name).Safe()
for {
//检查是否两个通道都已关闭且为空
@@ -493,7 +534,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
addClosed = true // 仅标记关闭,不立即日志
continue
}
addRes, err2 := db.Data(v).Insert()
addRes, err2 := tx.Model(Name).Data(v).Insert()
if err2 != nil {
g.Log().Error(ctx, err2)
continue
@@ -504,6 +545,20 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
}
row, _ := addRes.RowsAffected()
addCount += row
if addCount > TaskMax {
// g.Log().Debugf(ctx, "超过%v条act当前写入数据库: %v 条", TaskMax, addCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
return
}
//清空数量前累加一下
addAllCount += addCount
addCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid)
@@ -513,7 +568,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
continue
}
v.UpdatedAt = gtime.Now()
updateRes, err2 := db.Where(do.GameAct{
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
Uid: v.Uid,
ActId: v.ActId,
}).Data(v).Update()
@@ -525,9 +580,23 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
//g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue
}
updateCount++
if updateCount > TaskMax {
// g.Log().Debugf(ctx, "超过%v条act当前更新数据库: %v 条", TaskMax, updateCount)
err = tx.Commit()
if err != nil {
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
return
}
//清空数量前累加一下
updateAllCount += updateCount
updateCount = 0
tx, err = g.DB().Begin(ctx)
}
//删除缓存
s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
case <-ctx.Done():
g.Log().Debug(ctx, "act协程被上下文取消")
@@ -535,14 +604,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
}
}
err = tx.Commit()
addAllCount += addCount
updateAllCount += updateCount
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
g.Log().Debugf(ctx, "运行结束act当前写入数据库: %v 条", addAllCount)
g.Log().Debugf(ctx, "运行结束act当前更新数据库: %v 条", updateAllCount)
return
}
// 删除缓存key
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
go func() {
//如果有活跃,跳过删除
if getBool, _ := pkg.Cache("redis").
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
@@ -554,6 +627,7 @@ func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
if err != nil {
g.Log().Error(ctx, err)
}
}()
}
// 清空GetRedDot缓存

View File

@@ -100,9 +100,6 @@ func safeProperty(property map[string]any) {
if _, ok := v.(string); ok {
property[k] = onlyWordRE.ReplaceAllString(gconv.String(v), "*")
}
} else {
property[k] = safePropertyRE.ReplaceAllString(gconv.String(v), "*")
}
}
for _, delkey := range delkeys {

View File

@@ -16,7 +16,8 @@ func TestGamelog(t *testing.T) {
// 必填
Pid: "test5", // 项目ID
// BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
BaseUrl: "http://101.37.28.111:10101", // 香港测试服上报地址
// BaseUrl: "http://101.37.28.111:10101", // 香港测试服上报地址
BaseUrl: "http://47.77.200.131:10101", // 美国BIDB服务器
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
ReportSk: "sngame2025", // xor混淆key
FlushInterval: 5, // 上报间隔
@@ -52,3 +53,53 @@ func TestGamelog(t *testing.T) {
glsdk.Shutdown()
})
}
func TestPressMQ(t *testing.T) {
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
// 必填
Pid: "yotest", // 项目ID
BaseUrl: "http://47.77.200.131:10101", // 美国BIDB服务器
ReportSk: "sngame2025", // xor混淆key
FlushInterval: 6, // 上报间隔
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
// 可填
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
ChanSize: 500, // 默认1000, 信道size
SendSaveType: 2, // 发送存储类型, 默认不设置为0代表文件存储, 2代表走kafka可实同步日志
})
// 随机测试事件和属性
events := []string{"e1", "e2", "e3", "e4"}
pms := []map[string]any{
{"a": "1"},
{"a": "2"},
{"a": "3"},
{"a": "4"},
}
uuids := []string{}
for i := 0; i < 100; i++ {
uuidval, _ := uuid.NewUUID()
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
uuids = append(uuids, randUid)
}
if err != nil {
t.Fatal(err)
}
n := 0
const limit = 30000
gtest.C(t, func(t *gtest.T) {
go func() {
for {
glsdk.LogLtz(uuids[grand.Intn(len(uuids))], events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
// 并发控制
n++
if n%limit == 0 {
time.Sleep(time.Second * 1)
}
}
}()
time.Sleep(time.Second * 120)
// 模拟等待信号后优雅关闭
glsdk.Shutdown()
})
}

View File

@@ -117,6 +117,14 @@ func (r *F64CountRank) IncrScore(id int64, score int64) (curScore float64, err e
// @param score
// @return err
func (r *F64CountRank) SetScore(id int64, score int) (err error) {
//如果分数小于0则删除
if score <= 0 {
err = r.DelScore(id)
if err != nil {
return
}
return
}
// 记录当前时间戳,用于更新成员的最新活动时间。
now := time.Now().UnixMilli()
@@ -128,14 +136,8 @@ func (r *F64CountRank) SetScore(id int64, score int) (err error) {
if err != nil {
return
}
//如果分数小于0则删除
if score <= 0 {
err = r.DelScore(id)
if err != nil {
return
}
}
// 增加成员的分数,并返回增加后的当前分数。
// 覆盖成员的分数
_, err = g.Redis().ZAdd(ctx, r.name, &gredis.ZAddOption{}, gredis.ZAddMember{
Score: float64(score) + (3*1e13-float64(now))/1e14,
Member: id,
@@ -187,6 +189,35 @@ func (r *F64CountRank) DelScore(id int64) (err error) {
return
}
// DelByStopRank 删除指定名次后的元素
func (r *F64CountRank) DelByStopRank(stop int64) (err error) {
// 初始化一个空的int64切片用于存储指定排名范围内的元素。
var members []int64
// 使用Redis的ZRange命令获取指定排名范围内的元素。
// 选项Rev设置为true表示按照分数从高到低的顺序返回元素。
get, err := g.Redis().ZRange(ctx, r.name, stop, 9999999,
gredis.ZRangeOption{
Rev: true,
})
// 使用Scan方法将获取到的元素扫描到members切片中。
err = get.Scan(&members)
// 如果扫描过程中出现错误,直接返回错误。
if err != nil {
return
}
// 遍历members切片对于每个元素使用ZRem命令从更新时间集合中删除对应的成员。
for _, member := range members {
_, err = g.Redis().ZRem(ctx, r.updateTs, member)
// 忽略ZRem操作的错误因为即使元素不存在ZRem也不会返回错误。
}
//删除超过9999的数据
g.Redis().ZRemRangeByRank(ctx, r.name, 0, -(stop + 1))
return
}
// DelByRank 根据排名范围删除元素。
// 该方法使用了Redis的有序集合数据结构通过ZRange和ZRemRangeByRank命令来实现。
// 参数start和stop定义了要删除的排名范围从start到stop包括start和stop