Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1f1bea0e7 | ||
|
|
299ba0b93e | ||
|
|
c12c49477c | ||
|
|
b052754a30 | ||
|
|
aa1dc0896d | ||
|
|
8210ac24db | ||
|
|
cd3de96761 | ||
|
|
ce8ae4d26a | ||
|
|
50cfc23ad2 | ||
|
|
95539038c0 | ||
|
|
4b08a9ce84 | ||
|
|
6efdac7bab | ||
|
|
788cb2e6d4 | ||
|
|
dd8c05b344 | ||
|
|
e781e132ed | ||
|
|
862a6c8410 | ||
|
|
42535d0023 | ||
|
|
dd999cacf9 | ||
|
|
30d30bb8c6 | ||
|
|
f1c22dc9e6 | ||
| 14cf759ce1 | |||
| 19a19c1ff1 | |||
| 183d6d8b10 | |||
| d0cad61028 | |||
| efb34e0c5b |
@@ -2,8 +2,10 @@ package boot
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||
"github.com/ayflying/utility_go/service"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
)
|
||||
|
||||
@@ -16,12 +18,17 @@ func Boot() (err error) {
|
||||
// 启动计划任务定时器,预防debug工具激活计划任务造成重复执行,此处不执行计划任务
|
||||
//err = service.SystemCron().StartCron()
|
||||
|
||||
//用户活动持久化
|
||||
service.SystemCron().AddCronV2(v1.CronType_DAILY, func(ctx context.Context) error {
|
||||
err = service.GameKv().SavesV1()
|
||||
err = service.GameAct().Saves(ctx)
|
||||
return err
|
||||
})
|
||||
//用户活动持久化每小时执行一次
|
||||
service.SystemCron().AddCronV2(v1.CronType_HOUR, func(context.Context) error {
|
||||
go func() {
|
||||
err = service.GameKv().SavesV1()
|
||||
err = service.GameAct().SavesV2()
|
||||
if err != nil {
|
||||
g.Log().Error(gctx.New(), err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}, true)
|
||||
|
||||
//初始化自启动方法
|
||||
for _, v := range _func {
|
||||
|
||||
@@ -2,6 +2,7 @@ package gameAct
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -20,9 +21,11 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = gctx.New()
|
||||
Name = "game_act"
|
||||
ActList = gset.New(true)
|
||||
Name = "game_act"
|
||||
ActList = gset.New(true)
|
||||
RunTimeMax *gtime.Time
|
||||
addChan chan *entity.GameAct
|
||||
updateChan chan *entity.GameAct
|
||||
)
|
||||
|
||||
type sGameAct struct {
|
||||
@@ -45,6 +48,7 @@ func init() {
|
||||
// @return data *v1.Act: 返回活动信息结构体指针
|
||||
// @return err error: 返回错误信息
|
||||
func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
||||
var ctx = gctx.New()
|
||||
if uid == 0 || actId == 0 {
|
||||
g.Log().Error(ctx, "当前参数为空")
|
||||
return
|
||||
@@ -87,6 +91,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
||||
// @param data interface{}: 要存储的活动信息数据。
|
||||
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
||||
func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
|
||||
var ctx = gctx.New()
|
||||
if uid == 0 || actId == 0 {
|
||||
g.Log().Error(ctx, "当前参数为空")
|
||||
return
|
||||
@@ -107,37 +112,54 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *sGameAct) Saves(ctx context.Context) (err error) {
|
||||
getCache, _ := pkg.Cache("redis").Get(nil, "cron:game_act")
|
||||
// Saves 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @return err error: 返回错误信息
|
||||
// Deprecated: 该方法已被弃用,建议使用SavesV2方法
|
||||
func (s *sGameAct) Saves() (err error) {
|
||||
var ctx = gctx.New()
|
||||
g.Log().Debug(ctx, "开始执行游戏act数据保存了")
|
||||
//如果没有执行过,设置时间戳
|
||||
if getCache.Int64() > 0 {
|
||||
return
|
||||
} else {
|
||||
pkg.Cache("redis").Set(nil, "cron:game_act", gtime.Now().Unix(), time.Hour)
|
||||
}
|
||||
|
||||
// 最大允许执行时间
|
||||
RunTimeMax = gtime.Now().Add(time.Minute * 30)
|
||||
//遍历执行
|
||||
ActList.Iterator(func(i interface{}) bool {
|
||||
err = s.Save(ctx, i.(int))
|
||||
//在时间内允许执行
|
||||
if gtime.Now().Before(RunTimeMax) {
|
||||
g.Log().Debugf(ctx, "开始执行游戏act数据保存:act=%v", i)
|
||||
err = s.Save(ctx, i.(int))
|
||||
} else {
|
||||
g.Log().Errorf(ctx, "游戏act数据保存超时:act=%v", i)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Save 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param actId int: 活动ID
|
||||
// @return err error: 返回错误信息
|
||||
// deprecated: 该方法已被弃用,建议使用SaveV2方法
|
||||
func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
|
||||
cacheKey := fmt.Sprintf("act:%v:*", actId)
|
||||
//获取当前用户的key值
|
||||
//keys, err := utils.RedisScan(cacheKey)
|
||||
//if len(keys) > 10000 {
|
||||
// keys = keys[:10000]
|
||||
//}
|
||||
|
||||
var add = make([]*entity.GameAct, 0)
|
||||
var update = make([]*entity.GameAct, 0)
|
||||
//循环获取缓存数据
|
||||
err = tools.Redis.RedisScanV2(cacheKey, func(keys []string) (err error) {
|
||||
var add = make([]*entity.GameAct, 0)
|
||||
var update = make([]*entity.GameAct, 0)
|
||||
var delKey []string
|
||||
//判断是否超时
|
||||
if gtime.Now().After(RunTimeMax) {
|
||||
g.Log().Debug(ctx, "act执行超时了,停止执行!")
|
||||
err = errors.New("act执行超时了,停止执行!")
|
||||
return
|
||||
}
|
||||
|
||||
for _, cacheKey = range keys {
|
||||
result := strings.Split(cacheKey, ":")
|
||||
actId, err = strconv.Atoi(result[1])
|
||||
@@ -145,6 +167,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
uid = gconv.Int64(result[2])
|
||||
//uid, err = strconv.ParseInt(result[2], 10, 64)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -173,7 +196,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
ActId: actId,
|
||||
}).Fields("uid,act_id").Scan(&data)
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "当前数据错误: %v", cacheKey)
|
||||
g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey)
|
||||
continue
|
||||
}
|
||||
actionData := cacheGet.String()
|
||||
@@ -190,47 +213,64 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
data.Action = actionData
|
||||
update = append(update, data)
|
||||
}
|
||||
//最后删除key
|
||||
delKey = append(delKey, cacheKey)
|
||||
}
|
||||
|
||||
//批量写入数据库
|
||||
if len(delKey) > 0 {
|
||||
updateCount := 0
|
||||
|
||||
//g.Log().Debugf(ctx, "当前 %v 要更新的数据: %v 条", actId, len(update))
|
||||
if len(update) > 100 {
|
||||
for _, v := range update {
|
||||
v.UpdatedAt = gtime.Now()
|
||||
_, err2 := g.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
UpdatedAt: v.UpdatedAt,
|
||||
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
}).Data(v).Update()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
}
|
||||
if row, _ := updateRes.RowsAffected(); row == 0 {
|
||||
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
|
||||
//删除缓存
|
||||
go s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
|
||||
updateCount++
|
||||
update = make([]*entity.GameAct, 0)
|
||||
}
|
||||
g.Log().Debugf(ctx, "当前 %v 更新数据库: %v 条", actId, updateCount)
|
||||
update = make([]*entity.GameAct, 0)
|
||||
var count int64
|
||||
}
|
||||
|
||||
if len(add) > 0 {
|
||||
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save()
|
||||
add = make([]*entity.GameAct, 0)
|
||||
err = err2
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
var count int64
|
||||
//g.Log().Debugf(ctx, "当前 %v 要添加的数据: %v 条", actId, len(add))
|
||||
if len(add) > 100 {
|
||||
dbRes, err2 := g.Model(Name).Data(add).Save()
|
||||
|
||||
err = err2
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
}
|
||||
count, _ = dbRes.RowsAffected()
|
||||
if count == 0 {
|
||||
g.Log().Error(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||
for _, vTemp := range add {
|
||||
g.Log().Debugf(ctx, "当前act:%v,add写入数据: %v,内容:%v", vTemp.ActId, vTemp.Uid, vTemp.Action)
|
||||
}
|
||||
count, _ = dbRes.RowsAffected()
|
||||
return
|
||||
}
|
||||
|
||||
for _, v := range delKey {
|
||||
_, err = g.Redis().Del(ctx, v)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
}
|
||||
for _, v2 := range add {
|
||||
//删除缓存
|
||||
go s.DelCacheKey(ctx, v2.ActId, v2.Uid)
|
||||
}
|
||||
delKey = make([]string, 0)
|
||||
|
||||
g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||
add = make([]*entity.GameAct, 0)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -243,17 +283,240 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// SavesV2 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @return err error: 返回错误信息
|
||||
func (s *sGameAct) SavesV2() (err error) {
|
||||
var ctx = gctx.New()
|
||||
g.Log().Debug(ctx, "开始执行游戏act数据保存了")
|
||||
//如果没有执行过,设置时间戳
|
||||
// 最大允许执行时间
|
||||
RunTimeMax = gtime.Now().Add(time.Minute * 30)
|
||||
|
||||
//cacheKey := fmt.Sprintf("act:%v:*", actId)
|
||||
|
||||
addChan = make(chan *entity.GameAct, 1000)
|
||||
updateChan = make(chan *entity.GameAct, 1000)
|
||||
|
||||
go func() {
|
||||
//循环获取缓存数据
|
||||
err = tools.Redis.RedisScanV2("act:*", func(keys []string) (err error) {
|
||||
for _, key := range keys {
|
||||
//格式化数据
|
||||
err = s.SaveV2(ctx, key)
|
||||
|
||||
}
|
||||
return err
|
||||
})
|
||||
//关闭通道
|
||||
close(addChan)
|
||||
close(updateChan)
|
||||
}()
|
||||
// 启动缓存数据到数据库通道
|
||||
s.Cache2SqlChan(ctx)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// SaveV2 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param cacheKey string: 缓存键
|
||||
// @param add []*entity.GameAct: 添加数据
|
||||
// @param update []*entity.GameAct: 更新数据
|
||||
// @return err error: 返回错误信息
|
||||
func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string) (err error) {
|
||||
|
||||
result := strings.Split(cacheKey, ":")
|
||||
actId := gconv.Int(result[1])
|
||||
if actId == 0 {
|
||||
return
|
||||
}
|
||||
var uid int64
|
||||
uid = gconv.Int64(result[2])
|
||||
if uid == 0 {
|
||||
//跳过为空的用户缓存
|
||||
return
|
||||
}
|
||||
|
||||
//获取缓存数据
|
||||
cacheGet, _ := g.Redis().Get(ctx, cacheKey)
|
||||
|
||||
if cacheGet.IsEmpty() {
|
||||
//空数据也不保存
|
||||
return
|
||||
}
|
||||
|
||||
//如果有活跃,跳过持久化
|
||||
if getBool, _ := pkg.Cache("redis").
|
||||
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
|
||||
return
|
||||
}
|
||||
|
||||
//获取数据库数据
|
||||
var data *entity.GameAct
|
||||
// 从数据库中查询活动信息
|
||||
err = g.Model(Name).Where(do.GameAct{
|
||||
Uid: uid,
|
||||
ActId: actId,
|
||||
}).Fields("uid,act_id").Scan(&data)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey)
|
||||
return
|
||||
}
|
||||
|
||||
//如果没有数据,添加
|
||||
actionData := cacheGet.String()
|
||||
if data == nil {
|
||||
//add = append(add, &entity.GameAct{
|
||||
// ActId: actId,
|
||||
// Uid: uid,
|
||||
// Action: actionData,
|
||||
//})
|
||||
addChan <- &entity.GameAct{
|
||||
ActId: actId,
|
||||
Uid: uid,
|
||||
Action: actionData,
|
||||
}
|
||||
} else {
|
||||
//覆盖数据
|
||||
data.ActId = actId
|
||||
data.Uid = uid
|
||||
data.Action = actionData
|
||||
//update = append(update, data)
|
||||
updateChan <- data
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Cache2Sql 缓存持久化到数据库
|
||||
// @Description: 缓存持久化到数据库
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param add []*entity.GameAct: 添加数据
|
||||
// @param update []*entity.GameAct: 更新数据
|
||||
// @return err error: 返回错误信息
|
||||
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
|
||||
//批量写入数据库
|
||||
updateCount := 0
|
||||
if len(update) > 0 {
|
||||
for _, v := range update {
|
||||
v.UpdatedAt = gtime.Now()
|
||||
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
}).Data(v).Update()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
}
|
||||
if row, _ := updateRes.RowsAffected(); row == 0 {
|
||||
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
updateCount++
|
||||
}
|
||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||
update = (update)[:0]
|
||||
}
|
||||
|
||||
var addCount int64
|
||||
if len(add) > 0 {
|
||||
for _, v := range add {
|
||||
addRes, err2 := g.Model(Name).Data(v).Insert()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
}
|
||||
if row, _ := addRes.RowsAffected(); row == 0 {
|
||||
g.Log().Error(ctx, "本次新增为0,新增数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
addCount++
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
}
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
add = (add)[:0]
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Cache2SqlChan 缓存持久化到数据库
|
||||
// @Description: 缓存持久化到数据库
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
func (s *sGameAct) Cache2SqlChan(ctx context.Context) {
|
||||
//批量写入数据库
|
||||
updateCount := 0
|
||||
for v := range updateChan {
|
||||
v.UpdatedAt = gtime.Now()
|
||||
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
||||
Uid: v.Uid,
|
||||
ActId: v.ActId,
|
||||
}).Data(v).Update()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
}
|
||||
if row, _ := updateRes.RowsAffected(); row == 0 {
|
||||
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
updateCount++
|
||||
}
|
||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||
|
||||
var addCount int64
|
||||
for v := range addChan {
|
||||
addRes, err2 := g.Model(Name).Data(v).Insert()
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
continue
|
||||
}
|
||||
if row, _ := addRes.RowsAffected(); row == 0 {
|
||||
g.Log().Error(ctx, "本次新增为0,新增数据失败: %v", v)
|
||||
continue
|
||||
}
|
||||
addCount++
|
||||
//删除缓存
|
||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||
}
|
||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// 删除缓存key
|
||||
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
|
||||
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
|
||||
_, err := g.Redis().Del(ctx, cacheKey)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 清空GetRedDot缓存
|
||||
func (s *sGameAct) RefreshGetRedDotCache(uid int64) {
|
||||
cacheKey := fmt.Sprintf("gameAct:GetRedDot:%s:%d", gtime.Now().Format("d"), uid)
|
||||
_, err := pkg.Cache("redis").Remove(gctx.New(), cacheKey)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
g.Log().Error(gctx.New(), err)
|
||||
g.Dump(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sGameAct) Del(uid int64, actId int) {
|
||||
var ctx = gctx.New()
|
||||
if uid == 0 || actId == 0 {
|
||||
g.Log().Error(ctx, "当前参数为空")
|
||||
return
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package gameKv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -16,8 +18,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = gctx.New()
|
||||
Name = "game_kv"
|
||||
Name = "game_kv"
|
||||
RunTimeMax *gtime.Time
|
||||
)
|
||||
|
||||
type sGameKv struct {
|
||||
@@ -38,27 +40,32 @@ func init() {
|
||||
// @receiver s: sGameKv的实例。
|
||||
// @return err: 错误信息,如果操作成功,则为nil。
|
||||
func (s *sGameKv) SavesV1() (err error) {
|
||||
getCache, err := pkg.Cache("redis").Get(nil, "cron:game_kv")
|
||||
//如果没有执行过,设置时间戳
|
||||
if getCache.Int64() > 0 {
|
||||
return
|
||||
} else {
|
||||
pkg.Cache("redis").Set(nil, "cron:game_kv", gtime.Now().Unix(), time.Hour)
|
||||
var ctx = gctx.New()
|
||||
// 最大允许执行时间
|
||||
RunTimeMax = gtime.Now().Add(time.Minute * 30)
|
||||
g.Log().Debug(ctx, "开始执行游戏kv数据保存")
|
||||
|
||||
// 定义用于存储用户数据的结构体
|
||||
type ListData struct {
|
||||
Uid int64 `json:"uid"`
|
||||
Kv interface{} `json:"kv"`
|
||||
}
|
||||
var list []*ListData
|
||||
// 初始化列表,长度与keys数组一致
|
||||
list = make([]*ListData, 0)
|
||||
|
||||
// 从Redis列表中获取所有用户KV索引的键
|
||||
//keys, err := utils.RedisScan("user:kv:*")
|
||||
err = tools.Redis.RedisScanV2("user:kv:*", func(keys []string) (err error) {
|
||||
// 定义用于存储用户数据的结构体
|
||||
type ListData struct {
|
||||
Uid int64 `json:"uid"`
|
||||
Kv interface{} `json:"kv"`
|
||||
//判断是否超时
|
||||
if gtime.Now().After(RunTimeMax) {
|
||||
g.Log().Error(ctx, "kv执行超时了,停止执行!")
|
||||
err = errors.New("kv执行超时了,停止执行!")
|
||||
return
|
||||
}
|
||||
var list []*ListData
|
||||
// 初始化列表,长度与keys数组一致
|
||||
list = make([]*ListData, 0)
|
||||
|
||||
//需要删除的key
|
||||
var delKey []string
|
||||
|
||||
// 遍历keys,获取每个用户的数据并填充到list中
|
||||
for _, cacheKey := range keys {
|
||||
//g.Log().Infof(ctx, "保存用户kv数据%v", v)
|
||||
@@ -91,29 +98,21 @@ func (s *sGameKv) SavesV1() (err error) {
|
||||
Uid: uid,
|
||||
Kv: data,
|
||||
})
|
||||
|
||||
delKey = append(delKey, cacheKey)
|
||||
}
|
||||
|
||||
// 将列表数据保存到数据库
|
||||
if len(list) > 0 {
|
||||
_, err2 := g.Model("game_kv").Batch(30).Data(list).Save()
|
||||
list = make([]*ListData, 0)
|
||||
if len(list) > 100 {
|
||||
_, err2 := g.Model("game_kv").Data(list).Save()
|
||||
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
}
|
||||
|
||||
//批量删除key
|
||||
for _, v := range delKey {
|
||||
_, err2 = g.Redis().Del(ctx, v)
|
||||
if err2 != nil {
|
||||
g.Log().Errorf(ctx, "删除存档失败:%v,err=%v", v, err2)
|
||||
}
|
||||
//删除当前key
|
||||
for _, v := range list {
|
||||
go s.DelCacheKey(ctx, v.Uid)
|
||||
}
|
||||
|
||||
delKey = make([]string, 0)
|
||||
|
||||
list = make([]*ListData, 0)
|
||||
}
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, "当前kv数据入库失败: %v", err)
|
||||
@@ -122,17 +121,14 @@ func (s *sGameKv) SavesV1() (err error) {
|
||||
return
|
||||
})
|
||||
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
////跳过
|
||||
//if len(keys) == 0 {
|
||||
// return
|
||||
//}
|
||||
////一次最多处理10w条
|
||||
//if len(keys) > 10000 {
|
||||
// keys = keys[:10000]
|
||||
//}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// 删除缓存key
|
||||
func (s *sGameKv) DelCacheKey(ctx context.Context, uid int64) {
|
||||
cacheKey := fmt.Sprintf("user:kv:%v", uid)
|
||||
_, err := g.Redis().Del(ctx, cacheKey)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,9 @@ package systemCron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/api/system/v1"
|
||||
"github.com/ayflying/utility_go/service"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
@@ -9,8 +12,6 @@ import (
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/gogf/gf/v2/os/gtimer"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -59,7 +60,7 @@ type sSystemCron struct {
|
||||
func New() *sSystemCron {
|
||||
return &sSystemCron{
|
||||
taskChan: make(chan func(context.Context) error, 2),
|
||||
TaskTimeout: time.Minute * 30,
|
||||
TaskTimeout: time.Minute * 60,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +81,8 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
|
||||
var _func2 = func(ctx context.Context) error {
|
||||
return _func()
|
||||
}
|
||||
s.AddCronV2(typ, _func2)
|
||||
// 老版本计划任务全都是主服务器唯一执行
|
||||
s.AddCronV2(typ, _func2, true)
|
||||
}
|
||||
|
||||
// AddCronV2 添加一个定时任务到相应的调度列表中。
|
||||
@@ -89,7 +91,16 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
|
||||
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
||||
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
||||
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
||||
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error) {
|
||||
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行,false 全局执行不判断唯一
|
||||
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool) {
|
||||
//如果传过来的任务是需要主服务器执行一次
|
||||
if len(_onlyMain) > 0 && _onlyMain[0] {
|
||||
//判断当前是否为主服务器
|
||||
if !g.Cfg().MustGet(gctx.New(), "game.cron_main").Bool() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//加锁
|
||||
s.Lock.Lock()
|
||||
defer s.Lock.Unlock()
|
||||
@@ -322,7 +333,7 @@ func (s *sSystemCron) RunFuncChan() {
|
||||
//ctx := gctx.New()
|
||||
func() {
|
||||
//超时释放资源
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout)
|
||||
ctx, cancel := context.WithTimeout(gctx.New(), s.TaskTimeout)
|
||||
defer cancel()
|
||||
|
||||
// 使用匿名函数包裹来捕获 panic
|
||||
|
||||
399
package/gamelog/sdk.go
Normal file
399
package/gamelog/sdk.go
Normal file
@@ -0,0 +1,399 @@
|
||||
package gamelog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/net/gclient"
|
||||
"github.com/gogf/gf/v2/os/gfile"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
)
|
||||
|
||||
type sendBody struct {
|
||||
Pid string `json:"pid"`
|
||||
Data [][]any `json:"data"`
|
||||
}
|
||||
|
||||
// todo 游戏日志对象
|
||||
type GameLog struct {
|
||||
Uid string // 唯一uid
|
||||
Event string // 事件名
|
||||
Property map[string]any // 事件属性
|
||||
EventTimems int64 // 时间戳毫秒级别
|
||||
EventTimeLoc string // 带时区的本地时间字符串
|
||||
}
|
||||
|
||||
type SDKConfig struct {
|
||||
// 配置变量
|
||||
Pid string // 项目id
|
||||
BaseUrl string // 日志服务器地址
|
||||
ReportSk string // 上报解密key
|
||||
FlushInterval int // 刷新间隔
|
||||
DiskBakPath string // 磁盘备份路径
|
||||
RetryN int // 每N次重试
|
||||
ChanSize int // 信道大小, 默认1000
|
||||
|
||||
reportN int
|
||||
}
|
||||
|
||||
type SDK struct {
|
||||
// 控制变量
|
||||
wg sync.WaitGroup
|
||||
shutdown chan struct{}
|
||||
mu sync.Mutex
|
||||
sdkConfig *SDKConfig
|
||||
bufferChan chan GameLog // 日志队列
|
||||
buffer []GameLog // 日志队列
|
||||
}
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
gamelogClient *gclient.Client
|
||||
|
||||
// location map
|
||||
// locationMap map[string]*time.Location = map[string]*time.Location{}
|
||||
locationMap sync.Map // 声明一个线程安全的Map
|
||||
|
||||
)
|
||||
|
||||
func getLocationMapValue(key string) *time.Location {
|
||||
// 1. 先尝试读
|
||||
value, loaded := locationMap.Load(key)
|
||||
if loaded {
|
||||
return value.(*time.Location) // 如果已经存在,直接返回
|
||||
}
|
||||
// 2. 不存在,就初始化一个该key对应的**固定的**新值
|
||||
location, err := time.LoadLocation(key)
|
||||
if err != nil {
|
||||
g.Log().Warningf(ctx, "[GameLog]load location error, try use local timezone: %v", err)
|
||||
return nil
|
||||
}
|
||||
// 3. 核心:原子性地存储,如果key已存在则返回已存在的值
|
||||
actualValue, loaded := locationMap.LoadOrStore(key, location)
|
||||
if loaded {
|
||||
// 如果loaded为true,说明其他goroutine抢先存了
|
||||
// 我们可以丢弃刚创建的newValue(如果有需要的话),返回已存在的actualValue
|
||||
return actualValue.(*time.Location)
|
||||
}
|
||||
// 如果loaded为false,说明是我们存成功的,返回我们刚创建的newValue
|
||||
return actualValue.(*time.Location)
|
||||
}
|
||||
|
||||
func (sdk *SDK) varinit() error {
|
||||
sdk.sdkConfig = &SDKConfig{}
|
||||
|
||||
_pid, err := g.Config().Get(ctx, "angergs.bisdk.pid")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.Pid = _pid.String()
|
||||
|
||||
_baseUrl, err := g.Config().Get(ctx, "angergs.bisdk.recodeServerBaseUrl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.BaseUrl = _baseUrl.String()
|
||||
|
||||
_sk, err := g.Config().Get(ctx, "angergs.bisdk.reportSk")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.ReportSk = _sk.String()
|
||||
|
||||
_flushInterval, err := g.Config().Get(ctx, "angergs.bisdk.flushInterval")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.FlushInterval = _flushInterval.Int()
|
||||
|
||||
_diskBakPath, err := g.Config().Get(ctx, "angergs.bisdk.diskBakPath")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.DiskBakPath = _diskBakPath.String()
|
||||
|
||||
_retryN, err := g.Config().Get(ctx, "angergs.bisdk.retryN")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.RetryN = _retryN.Int()
|
||||
|
||||
_chanSize, err := g.Config().Get(ctx, "angergs.bisdk.chanSize")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sdk.sdkConfig.ChanSize = _chanSize.Int()
|
||||
|
||||
g.Log().Infof(ctx, "[GameLog]client init success, config: %v", sdk.sdkConfig)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sdk *SDK) checkConfig() error {
|
||||
config := sdk.sdkConfig
|
||||
if config.Pid == "" {
|
||||
return fmt.Errorf("pid is empty")
|
||||
}
|
||||
if config.BaseUrl == "" {
|
||||
return fmt.Errorf("baseUrl is empty")
|
||||
}
|
||||
if config.ReportSk == "" {
|
||||
return fmt.Errorf("reportSk is empty")
|
||||
}
|
||||
if config.FlushInterval <= 0 {
|
||||
return fmt.Errorf("flushInterval is invalid")
|
||||
}
|
||||
if config.DiskBakPath == "" {
|
||||
return fmt.Errorf("diskBakPath is empty")
|
||||
}
|
||||
if config.RetryN == 0 {
|
||||
config.RetryN = 10
|
||||
}
|
||||
if config.ChanSize == 0 {
|
||||
config.ChanSize = 1000
|
||||
}
|
||||
config.DiskBakPath = strings.TrimSuffix(config.DiskBakPath, "/")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func INIT(config *SDKConfig) (*SDK, error) {
|
||||
// 加载并检查配置
|
||||
sdk := &SDK{}
|
||||
if config != nil {
|
||||
sdk.sdkConfig = config
|
||||
} else if err := sdk.varinit(); err != nil { // 可以读goframe的配置
|
||||
return nil, err
|
||||
}
|
||||
if err := sdk.checkConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gamelogClient = g.Client()
|
||||
|
||||
// 初始化队列
|
||||
sdk.shutdown = make(chan struct{})
|
||||
sdk.bufferChan = make(chan GameLog, 1000)
|
||||
sdk.buffer = make([]GameLog, 0, 100)
|
||||
// 加载失败日志
|
||||
failLogs, err := sdk.loadFailLogs4disk()
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||
} else if len(failLogs) > 0 {
|
||||
sdk.buffer = append(sdk.buffer, failLogs...)
|
||||
}
|
||||
|
||||
// 开启协程进行日志发送
|
||||
sdk.wg = sync.WaitGroup{}
|
||||
sdk.wg.Add(1)
|
||||
go func() {
|
||||
defer sdk.wg.Done()
|
||||
ticker := time.NewTicker(time.Duration(sdk.sdkConfig.FlushInterval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sdk.shutdown:
|
||||
// 关闭时, 上传一次并备份失败数据
|
||||
g.Log().Infof(ctx, "[GameLog]begin shutdown and flush last")
|
||||
sdk.flush()
|
||||
return
|
||||
case log := <-sdk.bufferChan:
|
||||
sdk.buffer = append(sdk.buffer, log)
|
||||
case <-ticker.C:
|
||||
sdk.flush()
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
return sdk, nil
|
||||
}
|
||||
|
||||
// 从磁盘加载失败日志
|
||||
func (sdk *SDK) loadFailLogs4disk() (logs []GameLog, err error) {
|
||||
if !gfile.Exists(sdk.sdkConfig.DiskBakPath) {
|
||||
return
|
||||
}
|
||||
// 遍历diskBakPath下所有failBufferxxx.bak.log文件, 读取到log中
|
||||
files, err := gfile.ScanDir(sdk.sdkConfig.DiskBakPath, "failBuffer*.bak.log")
|
||||
logs = []GameLog{}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// 读取每个备份文件
|
||||
for _, fp := range files {
|
||||
// 每一行都是一次失败的记录
|
||||
gfile.ReadLines(fp, func(line string) error {
|
||||
_logs := []GameLog{}
|
||||
err := json.Unmarshal([]byte(line), &_logs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 合并到总日志列表
|
||||
logs = append(logs, _logs...)
|
||||
return nil
|
||||
})
|
||||
g.Log().Infof(ctx, "[GameLog]load %d faillogs from %s", len(logs), fp)
|
||||
gfile.Remove(fp)
|
||||
}
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
// 备份失败日志追加到磁盘
|
||||
func (sdk *SDK) bakFailLogs2disk(failLogs []GameLog) {
|
||||
bakPath := fmt.Sprintf("%s/failBuffer%s.bak.log", sdk.sdkConfig.DiskBakPath, gtime.Now().Format("YmdH"))
|
||||
content, err := json.Marshal(failLogs)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]marshal fail logs error: %v", err)
|
||||
return
|
||||
}
|
||||
gfile.PutContentsAppend(bakPath, string(content)+"\n")
|
||||
g.Log().Infof(ctx, "[GameLog]backup fail buffer to %s", bakPath)
|
||||
}
|
||||
|
||||
// 优雅关闭
|
||||
func (sdk *SDK) Shutdown() {
|
||||
close(sdk.shutdown)
|
||||
sdk.wg.Wait()
|
||||
}
|
||||
|
||||
// 日志时间格式
|
||||
const datetimeFmt = time.DateOnly + " " + time.TimeOnly
|
||||
|
||||
// 记录日志
|
||||
func (sdk *SDK) Log(uid, event string, property map[string]any, timezone string) {
|
||||
loc := time.Local
|
||||
if _loc := getLocationMapValue(timezone); _loc != nil {
|
||||
loc = _loc
|
||||
}
|
||||
log := GameLog{
|
||||
Uid: uid,
|
||||
Event: event,
|
||||
Property: property,
|
||||
EventTimems: gtime.Now().TimestampMilli(),
|
||||
EventTimeLoc: gtime.Now().In(loc).Format(datetimeFmt),
|
||||
}
|
||||
// 线程安全
|
||||
sdk.bufferChan <- log
|
||||
}
|
||||
|
||||
// 按服务器时区记录日志
|
||||
func (sdk *SDK) LogLtz(uid, event string, property map[string]any) {
|
||||
sdk.Log(uid, event, property, time.Local.String())
|
||||
}
|
||||
|
||||
// 这个方法只会在内部协程调用
|
||||
func (sdk *SDK) flush() {
|
||||
sdk.mu.Lock()
|
||||
defer sdk.mu.Unlock()
|
||||
if len(sdk.buffer) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
batch := make([]GameLog, len(sdk.buffer))
|
||||
copy(batch, sdk.buffer)
|
||||
sdk.buffer = sdk.buffer[:0]
|
||||
|
||||
// 第N次的时候加载失败数据进行尝试
|
||||
if sdk.sdkConfig.reportN != 0 && sdk.sdkConfig.reportN%sdk.sdkConfig.RetryN == 0 {
|
||||
faillogs, err := sdk.loadFailLogs4disk()
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||
}
|
||||
// 如果有失败日志则加入到批量数组中
|
||||
if len(faillogs) > 0 {
|
||||
batch = append(batch, faillogs...)
|
||||
}
|
||||
}
|
||||
sdk.send(batch)
|
||||
}
|
||||
|
||||
// 发送消息
|
||||
func (sdk *SDK) send(logs []GameLog) {
|
||||
waitSecond := time.Duration(sdk.sdkConfig.FlushInterval/4) * time.Second
|
||||
timeoutCtx, cancel := context.WithTimeout(context.Background(), waitSecond)
|
||||
defer cancel()
|
||||
data := make([][]any, 0, len(logs))
|
||||
// logs 拆分成二维数组
|
||||
for _, log := range logs {
|
||||
propertyJson, err := json.Marshal(log.Property)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]skip log parse, marshal property error: %v", err)
|
||||
continue
|
||||
}
|
||||
data = append(data, []any{
|
||||
log.Uid,
|
||||
log.Event,
|
||||
string(propertyJson),
|
||||
log.EventTimems,
|
||||
log.EventTimeLoc,
|
||||
})
|
||||
}
|
||||
// json化
|
||||
sbody := sendBody{
|
||||
Pid: sdk.sdkConfig.Pid,
|
||||
Data: data,
|
||||
}
|
||||
jsonBody, err := json.Marshal(sbody)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]marshal send body error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// giz压缩
|
||||
gzBody := bytes.NewBuffer([]byte{})
|
||||
gz := gzip.NewWriter(gzBody)
|
||||
gz.Write(jsonBody)
|
||||
gz.Close()
|
||||
|
||||
// XOR 加密
|
||||
xorBody := bytesXOR(gzBody.Bytes(), []byte(sdk.sdkConfig.ReportSk))
|
||||
|
||||
sdk.sdkConfig.reportN += 1
|
||||
res, err := gamelogClient.Post(timeoutCtx, sdk.sdkConfig.BaseUrl+"/report/event", xorBody)
|
||||
// 失败重新加入缓冲区
|
||||
if err != nil {
|
||||
sdk.bakFailLogs2disk(logs)
|
||||
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
cerr := res.Close()
|
||||
if cerr != nil {
|
||||
g.Log().Errorf(ctx, "[GameLog]close response error: %v", cerr)
|
||||
}
|
||||
}()
|
||||
httpcode := res.StatusCode
|
||||
resBody := res.ReadAllString()
|
||||
// 收集器拦截, 重新加入缓冲区
|
||||
if httpcode != http.StatusOK {
|
||||
sdk.bakFailLogs2disk(logs)
|
||||
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), resBody)
|
||||
}
|
||||
}
|
||||
|
||||
// 混淆
|
||||
func bytesXOR(data []byte, key []byte) []byte {
|
||||
obfuscated := make([]byte, len(data))
|
||||
keyLen := len(key)
|
||||
if keyLen == 0 {
|
||||
return data
|
||||
}
|
||||
|
||||
for i := range data {
|
||||
obfuscated[i] = data[i] ^ key[i%keyLen]
|
||||
}
|
||||
return obfuscated
|
||||
|
||||
// // 使用示例
|
||||
// key := []byte{0x12, 0x34, 0x56, 0x78}
|
||||
// obfuscated := multiXorObfuscate(original, key)
|
||||
// deobfuscated := multiXorObfuscate(obfuscated, key) // 解密
|
||||
}
|
||||
52
package/gamelog/test/gamelog_test.go
Normal file
52
package/gamelog/test/gamelog_test.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/package/gamelog"
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
"github.com/gogf/gf/v2/util/grand"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func TestGamelog(t *testing.T) {
|
||||
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
|
||||
// 必填
|
||||
Pid: "test5", // 项目ID
|
||||
BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
|
||||
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
|
||||
ReportSk: "sngame2025", // xor混淆key
|
||||
FlushInterval: 5, // 上报间隔
|
||||
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
|
||||
// 可填
|
||||
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
|
||||
ChanSize: 500, // 默认1000, 信道size
|
||||
})
|
||||
|
||||
// 随机测试事件和属性
|
||||
events := []string{"e1", "e2", "e3", "e4"}
|
||||
pms := []map[string]any{
|
||||
{"a": "1"},
|
||||
{"a": "2"},
|
||||
{"a": "3"},
|
||||
{"a": "4"},
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
go func() {
|
||||
for {
|
||||
uuidval, _ := uuid.NewUUID()
|
||||
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
|
||||
glsdk.LogLtz(randUid, events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}()
|
||||
time.Sleep(time.Second * 14)
|
||||
// 模拟等待信号后优雅关闭
|
||||
glsdk.Shutdown()
|
||||
})
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package aycache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||
@@ -33,7 +34,7 @@ var QPS = promauto.NewGauge(
|
||||
func init() {
|
||||
boot.AddFunc(func() {
|
||||
// 初始化指标,每分钟计算一次平均 QPS 并重置计数器
|
||||
service.SystemCron().AddCron(v1.CronType_MINUTE, func() error {
|
||||
service.SystemCron().AddCronV2(v1.CronType_MINUTE, func(context.Context) error {
|
||||
QPS.Set(math.Round(float64(QPSCount) / 60))
|
||||
QPSCount = 0
|
||||
return nil
|
||||
|
||||
@@ -31,7 +31,7 @@ type DataType struct {
|
||||
Url string `json:"url"` // S3 服务的访问 URL
|
||||
BucketName string `json:"bucket_name"` // 默认存储桶名称
|
||||
BucketNameCdn string `json:"bucket_name_cdn"` // CDN 存储桶名称
|
||||
Provider string `json:"provider"` // S3 服务的提供方
|
||||
//Provider string `json:"provider"` // S3 服务的提供方
|
||||
}
|
||||
|
||||
// Mod 定义了 S3 模块的结构体,包含一个 S3 客户端实例和配置信息
|
||||
@@ -184,6 +184,12 @@ func (s *Mod) ListObjects(bucketName string, prefix string) (res <-chan minio.Ob
|
||||
return
|
||||
}
|
||||
|
||||
// StatObject 获取指定存储桶中指定文件的元数据信息
|
||||
func (s *Mod) StatObject(bucketName string, objectName string) (res minio.ObjectInfo, err error) {
|
||||
res, err = s.client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
|
||||
return
|
||||
}
|
||||
|
||||
// SetBucketPolicy 设置指定存储桶或对象前缀的访问策略
|
||||
// 目前使用固定的策略,可根据需求修改
|
||||
func (s *Mod) SetBucketPolicy(bucketName string, prefix string) (err error) {
|
||||
|
||||
@@ -8,6 +8,7 @@ package service
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ayflying/utility_go/internal/model/entity"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
@@ -31,8 +32,48 @@ type (
|
||||
// @param data interface{}: 要存储的活动信息数据。
|
||||
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
||||
Set(uid int64, actId int, data interface{}) (err error)
|
||||
Saves(ctx context.Context) (err error)
|
||||
// Saves 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @return err error: 返回错误信息
|
||||
// Deprecated: 该方法已被弃用,建议使用SavesV2方法
|
||||
Saves() (err error)
|
||||
// Save 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param actId int: 活动ID
|
||||
// @return err error: 返回错误信息
|
||||
// deprecated: 该方法已被弃用,建议使用SaveV2方法
|
||||
Save(ctx context.Context, actId int) (err error)
|
||||
// SavesV2 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @return err error: 返回错误信息
|
||||
SavesV2() (err error)
|
||||
// SaveV2 保存游戏活动数据
|
||||
//
|
||||
// @Description: 保存游戏活动数据
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param cacheKey string: 缓存键
|
||||
// @param add []*entity.GameAct: 添加数据
|
||||
// @param update []*entity.GameAct: 更新数据
|
||||
// @return err error: 返回错误信息
|
||||
SaveV2(ctx context.Context, cacheKey string) (err error)
|
||||
// Cache2Sql 缓存持久化到数据库
|
||||
// @Description: 缓存持久化到数据库
|
||||
// @receiver s *sGameAct: 游戏活动服务结构体指针
|
||||
// @param ctx context.Context: 上下文对象
|
||||
// @param add []*entity.GameAct: 添加数据
|
||||
// @param update []*entity.GameAct: 更新数据
|
||||
// @return err error: 返回错误信息
|
||||
Cache2Sql(ctx context.Context, add []*entity.GameAct, update []*entity.GameAct)
|
||||
// 删除缓存key
|
||||
DelCacheKey(ctx context.Context, aid int, uid int64)
|
||||
// 清空GetRedDot缓存
|
||||
RefreshGetRedDotCache(uid int64)
|
||||
Del(uid int64, actId int)
|
||||
|
||||
@@ -5,6 +5,10 @@
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type (
|
||||
IGameKv interface {
|
||||
// SavesV1 方法
|
||||
@@ -13,6 +17,8 @@ type (
|
||||
// @receiver s: sGameKv的实例。
|
||||
// @return err: 错误信息,如果操作成功,则为nil。
|
||||
SavesV1() (err error)
|
||||
// 删除缓存key
|
||||
DelCacheKey(ctx context.Context, uid int64)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -7,9 +7,6 @@ package service
|
||||
|
||||
type (
|
||||
IIp2Region interface {
|
||||
// Load 加载到内存中
|
||||
//
|
||||
// @Description: 加载ip2region数据库到内存中。
|
||||
// @receiver s *sIp2region: sIp2region的实例。
|
||||
Load()
|
||||
GetIp(ip string) (res []string)
|
||||
|
||||
@@ -38,7 +38,8 @@ type (
|
||||
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
||||
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
||||
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
||||
AddCronV2(typ v1.CronType, _func func(context.Context) error)
|
||||
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行,false 全局执行不判断唯一
|
||||
AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool)
|
||||
// StartCron 开始计划任务执行
|
||||
//
|
||||
// @Description:
|
||||
|
||||
@@ -48,7 +48,6 @@ func (r *redis) RedisScan(cacheKey string, _key ...string) (keys []string, err e
|
||||
|
||||
// redis 批量获取大量数据
|
||||
func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ...string) error {
|
||||
|
||||
//var keys []string
|
||||
var err error
|
||||
|
||||
@@ -67,9 +66,11 @@ func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ..
|
||||
g.Log().Errorf(ctx, "Scan failed: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if len(newKeys) > 0 {
|
||||
err = _func(newKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//这个要放在最后
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package utility_go
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/config"
|
||||
"github.com/ayflying/utility_go/internal/boot"
|
||||
_ "github.com/ayflying/utility_go/internal/logic"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/gtimer"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -14,9 +18,12 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
g.Log().Debug(ctx, "utility_go init启动完成")
|
||||
// 初始化配置
|
||||
var err = boot.Boot()
|
||||
gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) {
|
||||
err = boot.Boot()
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
Reference in New Issue
Block a user