Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b96919d80 | ||
|
|
d6c3f542f3 | ||
|
|
1efac70cdb | ||
|
|
01c97c37f7 | ||
|
|
4fd262beae |
@@ -237,7 +237,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
//s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
updateCount++
|
updateCount++
|
||||||
update = make([]*entity.GameAct, 0)
|
update = make([]*entity.GameAct, 0)
|
||||||
@@ -458,10 +458,10 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
|||||||
tx, err = g.DB().Begin(ctx)
|
tx, err = g.DB().Begin(ctx)
|
||||||
}
|
}
|
||||||
//删除缓存
|
//删除缓存
|
||||||
//s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
}
|
}
|
||||||
//循环结束了,最后写入一波
|
//循环结束了,最后写入一波
|
||||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
g.Log().Debugf(ctx, "Cache2Sql运行结束,act当前更新数据库: %v 条", updateCount)
|
||||||
update = (update)[:0]
|
update = (update)[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -479,7 +479,7 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
|||||||
}
|
}
|
||||||
addCount++
|
addCount++
|
||||||
if addCount > TaskMax {
|
if addCount > TaskMax {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
g.Log().Debugf(ctx, "超过%v条,act当前写入数据库: %v 条", TaskMax, addCount)
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||||
@@ -489,11 +489,11 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
|||||||
tx, err = g.DB().Begin(ctx)
|
tx, err = g.DB().Begin(ctx)
|
||||||
}
|
}
|
||||||
//删除缓存
|
//删除缓存
|
||||||
//s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
//循环结束了,最后写入一波
|
//循环结束了,最后写入一波
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
g.Log().Debugf(ctx, "Cache2Sql运行结束,act当前写入数据库: %v 条", addCount)
|
||||||
add = (add)[:0]
|
add = (add)[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -510,6 +510,8 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
//通道关闭标志
|
//通道关闭标志
|
||||||
addClosed := false
|
addClosed := false
|
||||||
updateClosed := false
|
updateClosed := false
|
||||||
|
//写入总数
|
||||||
|
var addAllCount, updateAllCount int64
|
||||||
|
|
||||||
tx, err := g.DB().Begin(ctx)
|
tx, err := g.DB().Begin(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -545,18 +547,20 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
addCount += row
|
addCount += row
|
||||||
|
|
||||||
if addCount > TaskMax {
|
if addCount > TaskMax {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
// g.Log().Debugf(ctx, "超过%v条,act当前写入数据库: %v 条", TaskMax, addCount)
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//清空数量前累加一下
|
||||||
|
addAllCount += addCount
|
||||||
addCount = 0
|
addCount = 0
|
||||||
tx, err = g.DB().Begin(ctx)
|
tx, err = g.DB().Begin(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
//s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
case v, ok := <-updateChan:
|
case v, ok := <-updateChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -579,18 +583,20 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
updateCount++
|
updateCount++
|
||||||
|
|
||||||
if updateCount > TaskMax {
|
if updateCount > TaskMax {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
// g.Log().Debugf(ctx, "超过%v条,act当前更新数据库: %v 条", TaskMax, updateCount)
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//清空数量前累加一下
|
||||||
|
updateAllCount += updateCount
|
||||||
updateCount = 0
|
updateCount = 0
|
||||||
tx, err = g.DB().Begin(ctx)
|
tx, err = g.DB().Begin(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
//s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
g.Log().Debug(ctx, "act协程被上下文取消")
|
g.Log().Debug(ctx, "act协程被上下文取消")
|
||||||
@@ -599,9 +605,11 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
|
addAllCount += addCount
|
||||||
|
updateAllCount += updateCount
|
||||||
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
|
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
g.Log().Debugf(ctx, "运行结束act当前写入数据库: %v 条", addAllCount)
|
||||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
g.Log().Debugf(ctx, "运行结束act当前更新数据库: %v 条", updateAllCount)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,8 @@ func TestGamelog(t *testing.T) {
|
|||||||
// 必填
|
// 必填
|
||||||
Pid: "test5", // 项目ID
|
Pid: "test5", // 项目ID
|
||||||
// BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
|
// BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
|
||||||
BaseUrl: "http://101.37.28.111:10101", // 香港测试服上报地址
|
// BaseUrl: "http://101.37.28.111:10101", // 香港测试服上报地址
|
||||||
|
BaseUrl: "http://47.77.200.131:10101", // 美国BIDB服务器
|
||||||
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
|
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
|
||||||
ReportSk: "sngame2025", // xor混淆key
|
ReportSk: "sngame2025", // xor混淆key
|
||||||
FlushInterval: 5, // 上报间隔
|
FlushInterval: 5, // 上报间隔
|
||||||
@@ -52,3 +53,53 @@ func TestGamelog(t *testing.T) {
|
|||||||
glsdk.Shutdown()
|
glsdk.Shutdown()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPressMQ(t *testing.T) {
|
||||||
|
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
|
||||||
|
// 必填
|
||||||
|
Pid: "yotest", // 项目ID
|
||||||
|
BaseUrl: "http://47.77.200.131:10101", // 美国BIDB服务器
|
||||||
|
ReportSk: "sngame2025", // xor混淆key
|
||||||
|
FlushInterval: 6, // 上报间隔
|
||||||
|
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
|
||||||
|
// 可填
|
||||||
|
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
|
||||||
|
ChanSize: 500, // 默认1000, 信道size
|
||||||
|
SendSaveType: 2, // 发送存储类型, 默认不设置为0代表文件存储, 2代表走kafka可实同步日志
|
||||||
|
})
|
||||||
|
|
||||||
|
// 随机测试事件和属性
|
||||||
|
events := []string{"e1", "e2", "e3", "e4"}
|
||||||
|
pms := []map[string]any{
|
||||||
|
{"a": "1"},
|
||||||
|
{"a": "2"},
|
||||||
|
{"a": "3"},
|
||||||
|
{"a": "4"},
|
||||||
|
}
|
||||||
|
uuids := []string{}
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
uuidval, _ := uuid.NewUUID()
|
||||||
|
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
|
||||||
|
uuids = append(uuids, randUid)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
n := 0
|
||||||
|
const limit = 30000
|
||||||
|
gtest.C(t, func(t *gtest.T) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
glsdk.LogLtz(uuids[grand.Intn(len(uuids))], events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
|
||||||
|
// 并发控制
|
||||||
|
n++
|
||||||
|
if n%limit == 0 {
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Second * 120)
|
||||||
|
// 模拟等待信号后优雅关闭
|
||||||
|
glsdk.Shutdown()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user