Compare commits

...

26 Commits

Author SHA1 Message Date
ayflying
c12c49477c 优化持久化因为活动太多造成的速度太慢 2025-09-02 16:51:41 +08:00
ayflying
b052754a30 删除多余的日志 2025-09-02 15:12:43 +08:00
ayflying
aa1dc0896d 修复计划任务打断造成协程内上下文执行停止的问题 2025-09-02 12:25:30 +08:00
ayflying
8210ac24db 去掉启动的缓存key 2025-09-02 10:14:01 +08:00
ayflying
cd3de96761 提高kv持久化安全性,按需删除用户缓存 2025-09-01 18:35:43 +08:00
ayflying
ce8ae4d26a 按照执行中的状态进行删除 2025-09-01 18:32:02 +08:00
ayflying
50cfc23ad2 计划任务修改,act与kv使用协程方式执行,不影响其他任务 2025-09-01 18:12:58 +08:00
ayflying
95539038c0 去掉无用配置 2025-08-28 21:43:24 +08:00
ayflying
4b08a9ce84 s3增加方法,允许查看文件的元数据 2025-08-28 18:35:01 +08:00
ayflying
6efdac7bab redis刷新列表,允许打断,防止过长的日志 2025-08-28 18:16:47 +08:00
ayflying
788cb2e6d4 持久化失败会提前结束任务进行抛错 2025-08-27 10:31:48 +08:00
ayflying
dd8c05b344 更新持久化act更新语法 2025-08-27 09:57:45 +08:00
ayflying
e781e132ed 保存计划任务调整 2025-08-22 17:50:37 +08:00
ayflying
862a6c8410 修改参数名 2025-08-22 14:31:57 +08:00
ayflying
42535d0023 修改描述,预防误导 2025-08-22 14:29:38 +08:00
ayflying
dd999cacf9 区分唯一计划任务,判断当前服务器是否关闭计划任务 2025-08-22 14:16:52 +08:00
ayflying
30d30bb8c6 增加唯一执行的参数 2025-08-22 12:04:26 +08:00
ayflying
f1c22dc9e6 执行持久化的计划任务,有一个打断机制 2025-08-22 11:46:24 +08:00
14cf759ce1 Merge branch 'fixmapsync' into 'master'
修一个map的并发

See merge request public_project/utility_go!2
2025-08-21 08:41:14 +00:00
19a19c1ff1 修一个map的并发 2025-08-21 16:39:47 +08:00
183d6d8b10 Merge branch 'bisdk' into 'master'
Bisdk

See merge request public_project/utility_go!1
2025-08-21 08:15:26 +00:00
d0cad61028 一些err接受问题 2025-08-21 16:09:10 +08:00
efb34e0c5b sdk和测试代码 2025-08-21 15:52:58 +08:00
liaoyulong
8190e9f6b7 批量导入elk后接收返回信息判断是否全部导入成功 2025-08-21 10:56:51 +08:00
ayflying
27435b57b7 修改缓存驱动,第二个参数支持选择不同的缓存配置 2025-08-21 10:30:43 +08:00
ayflying
0628882533 缓存key删除失败不报错 2025-08-20 15:13:01 +08:00
16 changed files with 895 additions and 131 deletions

View File

@@ -2,8 +2,10 @@ package boot
import (
"context"
v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/service"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
)
@@ -16,12 +18,17 @@ func Boot() (err error) {
// 启动计划任务定时器预防debug工具激活计划任务造成重复执行此处不执行计划任务
//err = service.SystemCron().StartCron()
//用户活动持久化
service.SystemCron().AddCronV2(v1.CronType_DAILY, func(ctx context.Context) error {
err = service.GameKv().SavesV1()
err = service.GameAct().Saves(ctx)
return err
})
//用户活动持久化每小时执行一次
service.SystemCron().AddCronV2(v1.CronType_HOUR, func(context.Context) error {
go func() {
err = service.GameKv().SavesV1()
err = service.GameAct().SavesV2()
if err != nil {
g.Log().Error(gctx.New(), err)
}
}()
return nil
}, true)
//初始化自启动方法
for _, v := range _func {

View File

@@ -2,6 +2,7 @@ package gameAct
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
@@ -20,9 +21,9 @@ import (
)
var (
ctx = gctx.New()
Name = "game_act"
ActList = gset.New(true)
Name = "game_act"
ActList = gset.New(true)
RunTimeMax *gtime.Time
)
type sGameAct struct {
@@ -45,6 +46,7 @@ func init() {
// @return data *v1.Act: 返回活动信息结构体指针
// @return err error: 返回错误信息
func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
var ctx = gctx.New()
if uid == 0 || actId == 0 {
g.Log().Error(ctx, "当前参数为空")
return
@@ -87,6 +89,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
// @param data interface{}: 要存储的活动信息数据。
// @return err error: 返回错误信息如果操作成功则返回nil。
func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
var ctx = gctx.New()
if uid == 0 || actId == 0 {
g.Log().Error(ctx, "当前参数为空")
return
@@ -107,37 +110,54 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
return
}
func (s *sGameAct) Saves(ctx context.Context) (err error) {
getCache, _ := pkg.Cache("redis").Get(nil, "cron:game_act")
// Saves 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
// Deprecated: 该方法已被弃用建议使用SavesV2方法
func (s *sGameAct) Saves() (err error) {
var ctx = gctx.New()
g.Log().Debug(ctx, "开始执行游戏act数据保存了")
//如果没有执行过,设置时间戳
if getCache.Int64() > 0 {
return
} else {
pkg.Cache("redis").Set(nil, "cron:game_act", gtime.Now().Unix(), time.Hour)
}
// 最大允许执行时间
RunTimeMax = gtime.Now().Add(time.Minute * 30)
//遍历执行
ActList.Iterator(func(i interface{}) bool {
err = s.Save(ctx, i.(int))
//在时间内允许执行
if gtime.Now().Before(RunTimeMax) {
g.Log().Debugf(ctx, "开始执行游戏act数据保存:act=%v", i)
err = s.Save(ctx, i.(int))
} else {
g.Log().Errorf(ctx, "游戏act数据保存超时:act=%v", i)
}
return true
})
return
}
// Save 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param actId int: 活动ID
// @return err error: 返回错误信息
// deprecated: 该方法已被弃用建议使用SaveV2方法
func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
cacheKey := fmt.Sprintf("act:%v:*", actId)
//获取当前用户的key值
//keys, err := utils.RedisScan(cacheKey)
//if len(keys) > 10000 {
// keys = keys[:10000]
//}
var add = make([]*entity.GameAct, 0)
var update = make([]*entity.GameAct, 0)
//循环获取缓存数据
err = tools.Redis.RedisScanV2(cacheKey, func(keys []string) (err error) {
var add = make([]*entity.GameAct, 0)
var update = make([]*entity.GameAct, 0)
var delKey []string
//判断是否超时
if gtime.Now().After(RunTimeMax) {
g.Log().Debug(ctx, "act执行超时了,停止执行!")
err = errors.New("act执行超时了,停止执行!")
return
}
for _, cacheKey = range keys {
result := strings.Split(cacheKey, ":")
actId, err = strconv.Atoi(result[1])
@@ -145,6 +165,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
uid = gconv.Int64(result[2])
//uid, err = strconv.ParseInt(result[2], 10, 64)
if err != nil {
g.Log().Error(ctx, err)
continue
}
@@ -173,7 +194,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
ActId: actId,
}).Fields("uid,act_id").Scan(&data)
if err != nil {
g.Log().Debugf(ctx, "当前数据错误: %v", cacheKey)
g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey)
continue
}
actionData := cacheGet.String()
@@ -190,48 +211,64 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
data.Action = actionData
update = append(update, data)
}
//最后删除key
delKey = append(delKey, cacheKey)
}
//批量写入数据库
if len(delKey) > 0 {
updateCount := 0
//g.Log().Debugf(ctx, "当前 %v 要更新的数据: %v 条", actId, len(update))
if len(update) > 100 {
for _, v := range update {
v.UpdatedAt = gtime.Now()
_, err2 := g.Model(Name).Where(do.GameAct{
Uid: v.Uid,
ActId: v.ActId,
UpdatedAt: v.UpdatedAt,
updateRes, err2 := g.Model(Name).Where(do.GameAct{
Uid: v.Uid,
ActId: v.ActId,
}).Data(v).Update()
if err2 != nil {
g.Log().Error(ctx, err2)
return
}
if row, _ := updateRes.RowsAffected(); row == 0 {
g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue
}
//删除缓存
go s.DelCacheKey(ctx, v.ActId, v.Uid)
updateCount++
update = make([]*entity.GameAct, 0)
}
g.Log().Debugf(ctx, "当前 %v 更新数据库: %v 条", actId, updateCount)
update = make([]*entity.GameAct, 0)
var count int64
}
if len(add) > 0 {
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save()
add = make([]*entity.GameAct, 0)
err = err2
if err != nil {
g.Log().Error(ctx, err2)
return
var count int64
//g.Log().Debugf(ctx, "当前 %v 要添加的数据: %v 条", actId, len(add))
if len(add) > 100 {
dbRes, err2 := g.Model(Name).Data(add).Save()
err = err2
if err != nil {
g.Log().Error(ctx, err2)
return
}
count, _ = dbRes.RowsAffected()
if count == 0 {
g.Log().Error(ctx, "当前 %v 写入数据库: %v 条", actId, count)
for _, vTemp := range add {
g.Log().Debugf(ctx, "当前act%vadd写入数据: %v,内容:%v", vTemp.ActId, vTemp.Uid, vTemp.Action)
}
count, _ = dbRes.RowsAffected()
return
}
for _, v := range delKey {
_, err = g.Redis().Del(ctx, v)
if err != nil {
g.Log().Error(ctx, err)
return
}
for _, v2 := range add {
//删除缓存
go s.DelCacheKey(ctx, v2.ActId, v2.Uid)
}
delKey = make([]string, 0)
g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
add = make([]*entity.GameAct, 0)
}
if err != nil {
@@ -244,17 +281,184 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return
}
// SavesV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
func (s *sGameAct) SavesV2() (err error) {
var ctx = gctx.New()
g.Log().Debug(ctx, "开始执行游戏act数据保存了")
//如果没有执行过,设置时间戳
// 最大允许执行时间
RunTimeMax = gtime.Now().Add(time.Minute * 30)
//cacheKey := fmt.Sprintf("act:%v:*", actId)
var add = make([]*entity.GameAct, 0)
var update = make([]*entity.GameAct, 0)
//循环获取缓存数据
err = tools.Redis.RedisScanV2("act:*", func(keys []string) (err error) {
for _, key := range keys {
//格式化数据
err = s.SaveV2(ctx, key, add, update)
//持久化数据
err = s.Cache2Sql(ctx, add, update)
}
return err
})
return
}
// SaveV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param cacheKey string: 缓存键
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, add, update []*entity.GameAct) (err error) {
result := strings.Split(cacheKey, ":")
actId := gconv.Int(result[1])
if actId == 0 {
return
}
var uid int64
uid = gconv.Int64(result[2])
if uid == 0 {
//跳过为空的用户缓存
return
}
//获取缓存数据
cacheGet, _ := g.Redis().Get(ctx, cacheKey)
if cacheGet.IsEmpty() {
//空数据也不保存
return
}
//如果有活跃,跳过持久化
if getBool, _ := pkg.Cache("redis").
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
return
}
//获取数据库数据
var data *entity.GameAct
// 从数据库中查询活动信息
err = g.Model(Name).Where(do.GameAct{
Uid: uid,
ActId: actId,
}).Fields("uid,act_id").Scan(&data)
if err != nil {
g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey)
return
}
//如果没有数据,添加
actionData := cacheGet.String()
if data == nil {
add = append(add, &entity.GameAct{
ActId: actId,
Uid: uid,
Action: actionData,
})
} else {
//覆盖数据
data.ActId = actId
data.Uid = uid
data.Action = actionData
update = append(update, data)
}
return
}
// Cache2Sql 缓存持久化到数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) (err error) {
//批量写入数据库
updateCount := 0
if len(update) > 100 {
for _, v := range update {
v.UpdatedAt = gtime.Now()
updateRes, err2 := g.Model(Name).Where(do.GameAct{
Uid: v.Uid,
ActId: v.ActId,
}).Data(v).Update()
if err2 != nil {
g.Log().Error(ctx, err2)
continue
}
if row, _ := updateRes.RowsAffected(); row == 0 {
g.Log().Error(ctx, "本次更新为0更新数据失败: %v", v)
continue
}
//删除缓存
go s.DelCacheKey(ctx, v.ActId, v.Uid)
//updateCount++
update = make([]*entity.GameAct, 0)
}
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
update = make([]*entity.GameAct, 0)
}
var addCount int64
if len(add) > 100 {
for _, v := range add {
addRes, err2 := g.Model(Name).Data(v).Insert()
if err2 != nil {
g.Log().Error(ctx, err2)
continue
}
if row, _ := addRes.RowsAffected(); row == 0 {
g.Log().Error(ctx, "本次新增为0新增数据失败: %v", v)
continue
}
addCount++
//删除缓存
go s.DelCacheKey(ctx, v.ActId, v.Uid)
}
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
add = make([]*entity.GameAct, 0)
}
return
}
// 删除缓存key
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
_, err := g.Redis().Del(ctx, cacheKey)
if err != nil {
g.Log().Error(ctx, err)
}
}
// 清空GetRedDot缓存
func (s *sGameAct) RefreshGetRedDotCache(uid int64) {
cacheKey := fmt.Sprintf("gameAct:GetRedDot:%s:%d", gtime.Now().Format("d"), uid)
_, err := pkg.Cache("redis").Remove(gctx.New(), cacheKey)
if err != nil {
g.Log().Error(ctx, err)
g.Log().Error(gctx.New(), err)
g.Dump(err)
}
}
func (s *sGameAct) Del(uid int64, actId int) {
var ctx = gctx.New()
if uid == 0 || actId == 0 {
g.Log().Error(ctx, "当前参数为空")
return

View File

@@ -1,22 +1,25 @@
package gameKv
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/ayflying/utility_go/pkg"
"github.com/ayflying/utility_go/service"
"github.com/ayflying/utility_go/tools"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
"strconv"
"strings"
"sync"
"time"
)
var (
ctx = gctx.New()
Name = "game_kv"
Name = "game_kv"
RunTimeMax *gtime.Time
)
type sGameKv struct {
@@ -37,27 +40,32 @@ func init() {
// @receiver s: sGameKv的实例。
// @return err: 错误信息如果操作成功则为nil。
func (s *sGameKv) SavesV1() (err error) {
getCache, err := pkg.Cache("redis").Get(nil, "cron:game_kv")
//如果没有执行过,设置时间
if getCache.Int64() > 0 {
return
} else {
pkg.Cache("redis").Set(nil, "cron:game_kv", gtime.Now().Unix(), time.Hour)
var ctx = gctx.New()
// 最大允许执行时间
RunTimeMax = gtime.Now().Add(time.Minute * 30)
g.Log().Debug(ctx, "开始执行游戏kv数据保存")
// 定义用于存储用户数据的结构体
type ListData struct {
Uid int64 `json:"uid"`
Kv interface{} `json:"kv"`
}
var list []*ListData
// 初始化列表长度与keys数组一致
list = make([]*ListData, 0)
// 从Redis列表中获取所有用户KV索引的键
//keys, err := utils.RedisScan("user:kv:*")
err = tools.Redis.RedisScanV2("user:kv:*", func(keys []string) (err error) {
// 定义用于存储用户数据的结构体
type ListData struct {
Uid int64 `json:"uid"`
Kv interface{} `json:"kv"`
//判断是否超时
if gtime.Now().After(RunTimeMax) {
g.Log().Error(ctx, "kv执行超时了,停止执行!")
err = errors.New("kv执行超时了,停止执行!")
return
}
var list []*ListData
// 初始化列表长度与keys数组一致
list = make([]*ListData, 0)
//需要删除的key
var delKey []string
// 遍历keys获取每个用户的数据并填充到list中
for _, cacheKey := range keys {
//g.Log().Infof(ctx, "保存用户kv数据%v", v)
@@ -90,30 +98,21 @@ func (s *sGameKv) SavesV1() (err error) {
Uid: uid,
Kv: data,
})
delKey = append(delKey, cacheKey)
}
// 将列表数据保存到数据库
if len(list) > 0 {
_, err2 := g.Model("game_kv").Batch(30).Data(list).Save()
list = make([]*ListData, 0)
if len(list) > 100 {
_, err2 := g.Model("game_kv").Data(list).Save()
if err2 != nil {
g.Log().Error(ctx, err2)
return
}
//批量删除key
for _, v := range delKey {
_, err2 = g.Redis().Del(ctx, v)
if err2 != nil {
g.Log().Errorf(ctx, "删除存档错误:%v,err=%v", v, err2)
return
}
//删除当前key
for _, v := range list {
go s.DelCacheKey(ctx, v.Uid)
}
delKey = make([]string, 0)
list = make([]*ListData, 0)
}
if err != nil {
g.Log().Error(ctx, "当前kv数据入库失败: %v", err)
@@ -122,17 +121,14 @@ func (s *sGameKv) SavesV1() (err error) {
return
})
//if err != nil {
// return err
//}
////跳过
//if len(keys) == 0 {
// return
//}
////一次最多处理10w条
//if len(keys) > 10000 {
// keys = keys[:10000]
//}
return
}
// 删除缓存key
func (s *sGameKv) DelCacheKey(ctx context.Context, uid int64) {
cacheKey := fmt.Sprintf("user:kv:%v", uid)
_, err := g.Redis().Del(ctx, cacheKey)
if err != nil {
g.Log().Error(ctx, err)
}
}

View File

@@ -2,6 +2,9 @@ package systemCron
import (
"context"
"sync"
"time"
"github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/service"
"github.com/gogf/gf/v2/frame/g"
@@ -9,8 +12,6 @@ import (
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/os/gtimer"
"sync"
"time"
)
var (
@@ -59,7 +60,7 @@ type sSystemCron struct {
func New() *sSystemCron {
return &sSystemCron{
taskChan: make(chan func(context.Context) error, 2),
TaskTimeout: time.Minute * 30,
TaskTimeout: time.Minute * 60,
}
}
@@ -80,7 +81,8 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
var _func2 = func(ctx context.Context) error {
return _func()
}
s.AddCronV2(typ, _func2)
// 老版本计划任务全都是主服务器唯一执行
s.AddCronV2(typ, _func2, true)
}
// AddCronV2 添加一个定时任务到相应的调度列表中。
@@ -89,7 +91,16 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
// @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error) {
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行false 全局执行不判断唯一
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool) {
//如果传过来的任务是需要主服务器执行一次
if len(_onlyMain) > 0 && _onlyMain[0] {
//判断当前是否为主服务器
if !g.Cfg().MustGet(gctx.New(), "game.cron_main").Bool() {
return
}
}
//加锁
s.Lock.Lock()
defer s.Lock.Unlock()
@@ -322,7 +333,7 @@ func (s *sSystemCron) RunFuncChan() {
//ctx := gctx.New()
func() {
//超时释放资源
ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout)
ctx, cancel := context.WithTimeout(gctx.New(), s.TaskTimeout)
defer cancel()
// 使用匿名函数包裹来捕获 panic

399
package/gamelog/sdk.go Normal file
View File

@@ -0,0 +1,399 @@
package gamelog
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/gtime"
)
type sendBody struct {
Pid string `json:"pid"`
Data [][]any `json:"data"`
}
// todo 游戏日志对象
type GameLog struct {
Uid string // 唯一uid
Event string // 事件名
Property map[string]any // 事件属性
EventTimems int64 // 时间戳毫秒级别
EventTimeLoc string // 带时区的本地时间字符串
}
type SDKConfig struct {
// 配置变量
Pid string // 项目id
BaseUrl string // 日志服务器地址
ReportSk string // 上报解密key
FlushInterval int // 刷新间隔
DiskBakPath string // 磁盘备份路径
RetryN int // 每N次重试
ChanSize int // 信道大小, 默认1000
reportN int
}
type SDK struct {
// 控制变量
wg sync.WaitGroup
shutdown chan struct{}
mu sync.Mutex
sdkConfig *SDKConfig
bufferChan chan GameLog // 日志队列
buffer []GameLog // 日志队列
}
var (
ctx = context.Background()
gamelogClient *gclient.Client
// location map
// locationMap map[string]*time.Location = map[string]*time.Location{}
locationMap sync.Map // 声明一个线程安全的Map
)
func getLocationMapValue(key string) *time.Location {
// 1. 先尝试读
value, loaded := locationMap.Load(key)
if loaded {
return value.(*time.Location) // 如果已经存在,直接返回
}
// 2. 不存在就初始化一个该key对应的**固定的**新值
location, err := time.LoadLocation(key)
if err != nil {
g.Log().Warningf(ctx, "[GameLog]load location error, try use local timezone: %v", err)
return nil
}
// 3. 核心原子性地存储如果key已存在则返回已存在的值
actualValue, loaded := locationMap.LoadOrStore(key, location)
if loaded {
// 如果loaded为true说明其他goroutine抢先存了
// 我们可以丢弃刚创建的newValue如果有需要的话返回已存在的actualValue
return actualValue.(*time.Location)
}
// 如果loaded为false说明是我们存成功的返回我们刚创建的newValue
return actualValue.(*time.Location)
}
func (sdk *SDK) varinit() error {
sdk.sdkConfig = &SDKConfig{}
_pid, err := g.Config().Get(ctx, "angergs.bisdk.pid")
if err != nil {
return err
}
sdk.sdkConfig.Pid = _pid.String()
_baseUrl, err := g.Config().Get(ctx, "angergs.bisdk.recodeServerBaseUrl")
if err != nil {
return err
}
sdk.sdkConfig.BaseUrl = _baseUrl.String()
_sk, err := g.Config().Get(ctx, "angergs.bisdk.reportSk")
if err != nil {
return err
}
sdk.sdkConfig.ReportSk = _sk.String()
_flushInterval, err := g.Config().Get(ctx, "angergs.bisdk.flushInterval")
if err != nil {
return err
}
sdk.sdkConfig.FlushInterval = _flushInterval.Int()
_diskBakPath, err := g.Config().Get(ctx, "angergs.bisdk.diskBakPath")
if err != nil {
return err
}
sdk.sdkConfig.DiskBakPath = _diskBakPath.String()
_retryN, err := g.Config().Get(ctx, "angergs.bisdk.retryN")
if err != nil {
return err
}
sdk.sdkConfig.RetryN = _retryN.Int()
_chanSize, err := g.Config().Get(ctx, "angergs.bisdk.chanSize")
if err != nil {
return err
}
sdk.sdkConfig.ChanSize = _chanSize.Int()
g.Log().Infof(ctx, "[GameLog]client init success, config: %v", sdk.sdkConfig)
return nil
}
func (sdk *SDK) checkConfig() error {
config := sdk.sdkConfig
if config.Pid == "" {
return fmt.Errorf("pid is empty")
}
if config.BaseUrl == "" {
return fmt.Errorf("baseUrl is empty")
}
if config.ReportSk == "" {
return fmt.Errorf("reportSk is empty")
}
if config.FlushInterval <= 0 {
return fmt.Errorf("flushInterval is invalid")
}
if config.DiskBakPath == "" {
return fmt.Errorf("diskBakPath is empty")
}
if config.RetryN == 0 {
config.RetryN = 10
}
if config.ChanSize == 0 {
config.ChanSize = 1000
}
config.DiskBakPath = strings.TrimSuffix(config.DiskBakPath, "/")
return nil
}
func INIT(config *SDKConfig) (*SDK, error) {
// 加载并检查配置
sdk := &SDK{}
if config != nil {
sdk.sdkConfig = config
} else if err := sdk.varinit(); err != nil { // 可以读goframe的配置
return nil, err
}
if err := sdk.checkConfig(); err != nil {
return nil, err
}
gamelogClient = g.Client()
// 初始化队列
sdk.shutdown = make(chan struct{})
sdk.bufferChan = make(chan GameLog, 1000)
sdk.buffer = make([]GameLog, 0, 100)
// 加载失败日志
failLogs, err := sdk.loadFailLogs4disk()
if err != nil {
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
} else if len(failLogs) > 0 {
sdk.buffer = append(sdk.buffer, failLogs...)
}
// 开启协程进行日志发送
sdk.wg = sync.WaitGroup{}
sdk.wg.Add(1)
go func() {
defer sdk.wg.Done()
ticker := time.NewTicker(time.Duration(sdk.sdkConfig.FlushInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-sdk.shutdown:
// 关闭时, 上传一次并备份失败数据
g.Log().Infof(ctx, "[GameLog]begin shutdown and flush last")
sdk.flush()
return
case log := <-sdk.bufferChan:
sdk.buffer = append(sdk.buffer, log)
case <-ticker.C:
sdk.flush()
}
}
}()
return sdk, nil
}
// 从磁盘加载失败日志
func (sdk *SDK) loadFailLogs4disk() (logs []GameLog, err error) {
if !gfile.Exists(sdk.sdkConfig.DiskBakPath) {
return
}
// 遍历diskBakPath下所有failBufferxxx.bak.log文件 读取到log中
files, err := gfile.ScanDir(sdk.sdkConfig.DiskBakPath, "failBuffer*.bak.log")
logs = []GameLog{}
if err != nil {
return
}
// 读取每个备份文件
for _, fp := range files {
// 每一行都是一次失败的记录
gfile.ReadLines(fp, func(line string) error {
_logs := []GameLog{}
err := json.Unmarshal([]byte(line), &_logs)
if err != nil {
return err
}
// 合并到总日志列表
logs = append(logs, _logs...)
return nil
})
g.Log().Infof(ctx, "[GameLog]load %d faillogs from %s", len(logs), fp)
gfile.Remove(fp)
}
return
}
// 备份失败日志追加到磁盘
func (sdk *SDK) bakFailLogs2disk(failLogs []GameLog) {
bakPath := fmt.Sprintf("%s/failBuffer%s.bak.log", sdk.sdkConfig.DiskBakPath, gtime.Now().Format("YmdH"))
content, err := json.Marshal(failLogs)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]marshal fail logs error: %v", err)
return
}
gfile.PutContentsAppend(bakPath, string(content)+"\n")
g.Log().Infof(ctx, "[GameLog]backup fail buffer to %s", bakPath)
}
// 优雅关闭
func (sdk *SDK) Shutdown() {
close(sdk.shutdown)
sdk.wg.Wait()
}
// 日志时间格式
const datetimeFmt = time.DateOnly + " " + time.TimeOnly
// 记录日志
func (sdk *SDK) Log(uid, event string, property map[string]any, timezone string) {
loc := time.Local
if _loc := getLocationMapValue(timezone); _loc != nil {
loc = _loc
}
log := GameLog{
Uid: uid,
Event: event,
Property: property,
EventTimems: gtime.Now().TimestampMilli(),
EventTimeLoc: gtime.Now().In(loc).Format(datetimeFmt),
}
// 线程安全
sdk.bufferChan <- log
}
// 按服务器时区记录日志
func (sdk *SDK) LogLtz(uid, event string, property map[string]any) {
sdk.Log(uid, event, property, time.Local.String())
}
// 这个方法只会在内部协程调用
func (sdk *SDK) flush() {
sdk.mu.Lock()
defer sdk.mu.Unlock()
if len(sdk.buffer) == 0 {
return
}
batch := make([]GameLog, len(sdk.buffer))
copy(batch, sdk.buffer)
sdk.buffer = sdk.buffer[:0]
// 第N次的时候加载失败数据进行尝试
if sdk.sdkConfig.reportN != 0 && sdk.sdkConfig.reportN%sdk.sdkConfig.RetryN == 0 {
faillogs, err := sdk.loadFailLogs4disk()
if err != nil {
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
}
// 如果有失败日志则加入到批量数组中
if len(faillogs) > 0 {
batch = append(batch, faillogs...)
}
}
sdk.send(batch)
}
// 发送消息
func (sdk *SDK) send(logs []GameLog) {
waitSecond := time.Duration(sdk.sdkConfig.FlushInterval/4) * time.Second
timeoutCtx, cancel := context.WithTimeout(context.Background(), waitSecond)
defer cancel()
data := make([][]any, 0, len(logs))
// logs 拆分成二维数组
for _, log := range logs {
propertyJson, err := json.Marshal(log.Property)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]skip log parse, marshal property error: %v", err)
continue
}
data = append(data, []any{
log.Uid,
log.Event,
string(propertyJson),
log.EventTimems,
log.EventTimeLoc,
})
}
// json化
sbody := sendBody{
Pid: sdk.sdkConfig.Pid,
Data: data,
}
jsonBody, err := json.Marshal(sbody)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]marshal send body error: %v", err)
return
}
// giz压缩
gzBody := bytes.NewBuffer([]byte{})
gz := gzip.NewWriter(gzBody)
gz.Write(jsonBody)
gz.Close()
// XOR 加密
xorBody := bytesXOR(gzBody.Bytes(), []byte(sdk.sdkConfig.ReportSk))
sdk.sdkConfig.reportN += 1
res, err := gamelogClient.Post(timeoutCtx, sdk.sdkConfig.BaseUrl+"/report/event", xorBody)
// 失败重新加入缓冲区
if err != nil {
sdk.bakFailLogs2disk(logs)
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), err)
return
}
defer func() {
cerr := res.Close()
if cerr != nil {
g.Log().Errorf(ctx, "[GameLog]close response error: %v", cerr)
}
}()
httpcode := res.StatusCode
resBody := res.ReadAllString()
// 收集器拦截, 重新加入缓冲区
if httpcode != http.StatusOK {
sdk.bakFailLogs2disk(logs)
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), resBody)
}
}
// 混淆
func bytesXOR(data []byte, key []byte) []byte {
obfuscated := make([]byte, len(data))
keyLen := len(key)
if keyLen == 0 {
return data
}
for i := range data {
obfuscated[i] = data[i] ^ key[i%keyLen]
}
return obfuscated
// // 使用示例
// key := []byte{0x12, 0x34, 0x56, 0x78}
// obfuscated := multiXorObfuscate(original, key)
// deobfuscated := multiXorObfuscate(obfuscated, key) // 解密
}

View File

@@ -0,0 +1,52 @@
package test
import (
"strings"
"testing"
"time"
"github.com/ayflying/utility_go/package/gamelog"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/util/grand"
"github.com/google/uuid"
)
func TestGamelog(t *testing.T) {
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
// 必填
Pid: "test5", // 项目ID
BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
ReportSk: "sngame2025", // xor混淆key
FlushInterval: 5, // 上报间隔
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
// 可填
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
ChanSize: 500, // 默认1000, 信道size
})
// 随机测试事件和属性
events := []string{"e1", "e2", "e3", "e4"}
pms := []map[string]any{
{"a": "1"},
{"a": "2"},
{"a": "3"},
{"a": "4"},
}
if err != nil {
t.Fatal(err)
}
gtest.C(t, func(t *gtest.T) {
go func() {
for {
uuidval, _ := uuid.NewUUID()
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
glsdk.LogLtz(randUid, events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
time.Sleep(time.Millisecond * 100)
}
}()
time.Sleep(time.Second * 14)
// 模拟等待信号后优雅关闭
glsdk.Shutdown()
})
}

View File

@@ -1,6 +1,9 @@
package aycache
import (
"context"
"math"
v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/internal/boot"
"github.com/ayflying/utility_go/pkg/aycache/drive"
@@ -9,7 +12,6 @@ import (
"github.com/gogf/gf/v2/os/gcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"math"
)
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
@@ -19,6 +21,7 @@ type Mod struct {
// QPSCount 记录缓存的 QPS 计数
var QPSCount int
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
var QPS = promauto.NewGauge(
prometheus.GaugeOpts{
@@ -31,7 +34,7 @@ var QPS = promauto.NewGauge(
func init() {
boot.AddFunc(func() {
// 初始化指标,每分钟计算一次平均 QPS 并重置计数器
service.SystemCron().AddCron(v1.CronType_MINUTE, func() error {
service.SystemCron().AddCronV2(v1.CronType_MINUTE, func(context.Context) error {
QPS.Set(math.Round(float64(QPSCount) / 60))
QPSCount = 0
return nil
@@ -53,8 +56,13 @@ func New(_name ...string) gcache.Adapter {
// 创建内存缓存适配器
cacheAdapterObj = drive2.NewAdapterMemory()
case "redis":
//第二个参数为配置名称默认为default
var typ = "default"
if len(_name) >= 2 {
typ = _name[1]
}
// 创建 Redis 缓存适配器
cacheAdapterObj = drive2.NewAdapterRedis()
cacheAdapterObj = drive2.NewAdapterRedis(typ)
case "file":
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")

View File

@@ -7,19 +7,22 @@ import (
"github.com/gogf/gf/v2/os/gctx"
)
var adapterRedisClient gcache.Adapter
var adapterRedisCache = gcache.New()
var adapterRedisClient = make(map[string]gcache.Adapter)
var adapterRedisCache = make(map[string]*gcache.Cache)
func NewAdapterRedis() gcache.Adapter {
if adapterRedisClient == nil {
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
func NewAdapterRedis(name string) gcache.Adapter {
if adapterRedisClient[name] == nil {
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
if err != nil {
panic("当前redis配置不存在")
}
var cfg *gredis.Config
_cfg.Scan(&cfg)
redisObj, _ := gredis.New(cfg)
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
adapterRedisCache.SetAdapter(adapterRedisClient)
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
adapterRedisCache[name] = gcache.New()
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
}
return adapterRedisCache
return adapterRedisCache[name]
}

View File

@@ -3,7 +3,9 @@ package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
save = append(save, v)
}
//save = data
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
if err2 != nil {
err = err2
return
}
//需要接收返回信息,判断是否全部执行成功
if response.Errors { //未全部完成
//是否需要删除已成功导入的部分数据
for _, item := range response.Items {
for _, v := range item {
if v.Error != nil { //失败
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
} else {
//删除已导入成功的数据
_, err = s.Delete(ctx, *v.Id_)
if err != nil {
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
}
}
}
}
return errors.New("部分数据导入失败")
}
return
}
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
// Select 查询
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
res, err = s.client.Search(). //Index("my_index").
Request(&search.Request{
Request(&search.Request{
Query: &types.Query{
MatchAll: &types.MatchAllQuery{},
},

View File

@@ -31,7 +31,7 @@ type DataType struct {
Url string `json:"url"` // S3 服务的访问 URL
BucketName string `json:"bucket_name"` // 默认存储桶名称
BucketNameCdn string `json:"bucket_name_cdn"` // CDN 存储桶名称
Provider string `json:"provider"` // S3 服务的提供方
//Provider string `json:"provider"` // S3 服务的提供方
}
// Mod 定义了 S3 模块的结构体,包含一个 S3 客户端实例和配置信息
@@ -184,6 +184,12 @@ func (s *Mod) ListObjects(bucketName string, prefix string) (res <-chan minio.Ob
return
}
// StatObject 获取指定存储桶中指定文件的元数据信息
func (s *Mod) StatObject(bucketName string, objectName string) (res minio.ObjectInfo, err error) {
res, err = s.client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
return
}
// SetBucketPolicy 设置指定存储桶或对象前缀的访问策略
// 目前使用固定的策略,可根据需求修改
func (s *Mod) SetBucketPolicy(bucketName string, prefix string) (err error) {

View File

@@ -8,6 +8,7 @@ package service
import (
"context"
"github.com/ayflying/utility_go/internal/model/entity"
"github.com/gogf/gf/v2/frame/g"
)
@@ -31,8 +32,48 @@ type (
// @param data interface{}: 要存储的活动信息数据。
// @return err error: 返回错误信息如果操作成功则返回nil。
Set(uid int64, actId int, data interface{}) (err error)
Saves(ctx context.Context) (err error)
// Saves 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
// Deprecated: 该方法已被弃用建议使用SavesV2方法
Saves() (err error)
// Save 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param actId int: 活动ID
// @return err error: 返回错误信息
// deprecated: 该方法已被弃用建议使用SaveV2方法
Save(ctx context.Context, actId int) (err error)
// SavesV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @return err error: 返回错误信息
SavesV2() (err error)
// SaveV2 保存游戏活动数据
//
// @Description: 保存游戏活动数据
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param cacheKey string: 缓存键
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
SaveV2(ctx context.Context, cacheKey string, add []*entity.GameAct, update []*entity.GameAct) (err error)
// Cache2Sql 缓存持久化到数据库
// @Description: 缓存持久化到数据库
// @receiver s *sGameAct: 游戏活动服务结构体指针
// @param ctx context.Context: 上下文对象
// @param add []*entity.GameAct: 添加数据
// @param update []*entity.GameAct: 更新数据
// @return err error: 返回错误信息
Cache2Sql(ctx context.Context, add []*entity.GameAct, update []*entity.GameAct) (err error)
// 删除缓存key
DelCacheKey(ctx context.Context, aid int, uid int64)
// 清空GetRedDot缓存
RefreshGetRedDotCache(uid int64)
Del(uid int64, actId int)

View File

@@ -5,6 +5,10 @@
package service
import (
"context"
)
type (
IGameKv interface {
// SavesV1 方法
@@ -13,6 +17,8 @@ type (
// @receiver s: sGameKv的实例。
// @return err: 错误信息如果操作成功则为nil。
SavesV1() (err error)
// 删除缓存key
DelCacheKey(ctx context.Context, uid int64)
}
)

View File

@@ -7,9 +7,6 @@ package service
type (
IIp2Region interface {
// Load 加载到内存中
//
// @Description: 加载ip2region数据库到内存中。
// @receiver s *sIp2region: sIp2region的实例。
Load()
GetIp(ip string) (res []string)

View File

@@ -38,7 +38,8 @@ type (
// @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。
AddCronV2(typ v1.CronType, _func func(context.Context) error)
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行false 全局执行不判断唯一
AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool)
// StartCron 开始计划任务执行
//
// @Description:

View File

@@ -48,7 +48,6 @@ func (r *redis) RedisScan(cacheKey string, _key ...string) (keys []string, err e
// redis 批量获取大量数据
func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ...string) error {
//var keys []string
var err error
@@ -67,9 +66,11 @@ func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ..
g.Log().Errorf(ctx, "Scan failed: %v", err)
break
}
if len(newKeys) > 0 {
err = _func(newKeys)
if err != nil {
return err
}
}
//这个要放在最后

View File

@@ -1,11 +1,15 @@
package utility_go
import (
"context"
"time"
"github.com/ayflying/utility_go/config"
"github.com/ayflying/utility_go/internal/boot"
_ "github.com/ayflying/utility_go/internal/logic"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtimer"
)
var (
@@ -14,9 +18,12 @@ var (
)
func init() {
var err error
g.Log().Debug(ctx, "utility_go init启动完成")
// 初始化配置
var err = boot.Boot()
gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) {
err = boot.Boot()
})
if err != nil {
panic(err)