Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa1dc0896d | ||
|
|
8210ac24db | ||
|
|
cd3de96761 | ||
|
|
ce8ae4d26a | ||
|
|
50cfc23ad2 | ||
|
|
95539038c0 | ||
|
|
4b08a9ce84 | ||
|
|
6efdac7bab | ||
|
|
788cb2e6d4 | ||
|
|
dd8c05b344 | ||
|
|
e781e132ed | ||
|
|
862a6c8410 | ||
|
|
42535d0023 | ||
|
|
dd999cacf9 | ||
|
|
30d30bb8c6 | ||
|
|
f1c22dc9e6 | ||
| 14cf759ce1 | |||
| 19a19c1ff1 | |||
| 183d6d8b10 | |||
| d0cad61028 | |||
| efb34e0c5b | |||
|
|
8190e9f6b7 | ||
|
|
27435b57b7 | ||
|
|
0628882533 | ||
|
|
f68655eee6 |
@@ -2,8 +2,10 @@ package boot
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||||
"github.com/ayflying/utility_go/service"
|
"github.com/ayflying/utility_go/service"
|
||||||
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -16,12 +18,17 @@ func Boot() (err error) {
|
|||||||
// 启动计划任务定时器,预防debug工具激活计划任务造成重复执行,此处不执行计划任务
|
// 启动计划任务定时器,预防debug工具激活计划任务造成重复执行,此处不执行计划任务
|
||||||
//err = service.SystemCron().StartCron()
|
//err = service.SystemCron().StartCron()
|
||||||
|
|
||||||
//用户活动持久化
|
//用户活动持久化每小时执行一次
|
||||||
service.SystemCron().AddCronV2(v1.CronType_DAILY, func(ctx context.Context) error {
|
service.SystemCron().AddCronV2(v1.CronType_HOUR, func(context.Context) error {
|
||||||
|
go func() {
|
||||||
err = service.GameKv().SavesV1()
|
err = service.GameKv().SavesV1()
|
||||||
err = service.GameAct().Saves(ctx)
|
err = service.GameAct().Saves()
|
||||||
return err
|
if err != nil {
|
||||||
})
|
g.Log().Error(gctx.New(), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}, true)
|
||||||
|
|
||||||
//初始化自启动方法
|
//初始化自启动方法
|
||||||
for _, v := range _func {
|
for _, v := range _func {
|
||||||
|
|||||||
@@ -2,7 +2,12 @@ package gameAct
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/internal/model/do"
|
"github.com/ayflying/utility_go/internal/model/do"
|
||||||
"github.com/ayflying/utility_go/internal/model/entity"
|
"github.com/ayflying/utility_go/internal/model/entity"
|
||||||
"github.com/ayflying/utility_go/pkg"
|
"github.com/ayflying/utility_go/pkg"
|
||||||
@@ -13,15 +18,12 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"github.com/gogf/gf/v2/util/gconv"
|
"github.com/gogf/gf/v2/util/gconv"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ctx = gctx.New()
|
|
||||||
Name = "game_act"
|
Name = "game_act"
|
||||||
ActList = gset.New(true)
|
ActList = gset.New(true)
|
||||||
|
RunTimeMax *gtime.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
type sGameAct struct {
|
type sGameAct struct {
|
||||||
@@ -44,6 +46,7 @@ func init() {
|
|||||||
// @return data *v1.Act: 返回活动信息结构体指针
|
// @return data *v1.Act: 返回活动信息结构体指针
|
||||||
// @return err error: 返回错误信息
|
// @return err error: 返回错误信息
|
||||||
func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
||||||
|
var ctx = gctx.New()
|
||||||
if uid == 0 || actId == 0 {
|
if uid == 0 || actId == 0 {
|
||||||
g.Log().Error(ctx, "当前参数为空")
|
g.Log().Error(ctx, "当前参数为空")
|
||||||
return
|
return
|
||||||
@@ -86,6 +89,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
|||||||
// @param data interface{}: 要存储的活动信息数据。
|
// @param data interface{}: 要存储的活动信息数据。
|
||||||
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
||||||
func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
|
func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
|
||||||
|
var ctx = gctx.New()
|
||||||
if uid == 0 || actId == 0 {
|
if uid == 0 || actId == 0 {
|
||||||
g.Log().Error(ctx, "当前参数为空")
|
g.Log().Error(ctx, "当前参数为空")
|
||||||
return
|
return
|
||||||
@@ -106,18 +110,21 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sGameAct) Saves(ctx context.Context) (err error) {
|
func (s *sGameAct) Saves() (err error) {
|
||||||
getCache, _ := pkg.Cache("redis").Get(nil, "cron:game_act")
|
var ctx = gctx.New()
|
||||||
|
g.Log().Debug(ctx, "开始执行游戏act数据保存了")
|
||||||
//如果没有执行过,设置时间戳
|
//如果没有执行过,设置时间戳
|
||||||
if getCache.Int64() > 0 {
|
// 最大允许执行时间
|
||||||
return
|
RunTimeMax = gtime.Now().Add(time.Minute * 30)
|
||||||
} else {
|
|
||||||
pkg.Cache("redis").Set(nil, "cron:game_act", gtime.Now().Unix(), time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
//遍历执行
|
//遍历执行
|
||||||
ActList.Iterator(func(i interface{}) bool {
|
ActList.Iterator(func(i interface{}) bool {
|
||||||
|
//在时间内允许执行
|
||||||
|
if gtime.Now().Before(RunTimeMax) {
|
||||||
|
g.Log().Debugf(ctx, "开始执行游戏act数据保存:act=%v", i)
|
||||||
err = s.Save(ctx, i.(int))
|
err = s.Save(ctx, i.(int))
|
||||||
|
} else {
|
||||||
|
g.Log().Errorf(ctx, "游戏act数据保存超时:act=%v", i)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
@@ -126,17 +133,18 @@ func (s *sGameAct) Saves(ctx context.Context) (err error) {
|
|||||||
func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||||
|
|
||||||
cacheKey := fmt.Sprintf("act:%v:*", actId)
|
cacheKey := fmt.Sprintf("act:%v:*", actId)
|
||||||
//获取当前用户的key值
|
var add = make([]*entity.GameAct, 0)
|
||||||
//keys, err := utils.RedisScan(cacheKey)
|
var update = make([]*entity.GameAct, 0)
|
||||||
//if len(keys) > 10000 {
|
|
||||||
// keys = keys[:10000]
|
|
||||||
//}
|
|
||||||
|
|
||||||
//循环获取缓存数据
|
//循环获取缓存数据
|
||||||
err = tools.Redis.RedisScanV2(cacheKey, func(keys []string) (err error) {
|
err = tools.Redis.RedisScanV2(cacheKey, func(keys []string) (err error) {
|
||||||
var add = make([]*entity.GameAct, 0)
|
//判断是否超时
|
||||||
var update = make([]*entity.GameAct, 0)
|
if gtime.Now().After(RunTimeMax) {
|
||||||
var delKey []string
|
g.Log().Debug(ctx, "act执行超时了,停止执行!")
|
||||||
|
err = errors.New("act执行超时了,停止执行!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, cacheKey = range keys {
|
for _, cacheKey = range keys {
|
||||||
result := strings.Split(cacheKey, ":")
|
result := strings.Split(cacheKey, ":")
|
||||||
actId, err = strconv.Atoi(result[1])
|
actId, err = strconv.Atoi(result[1])
|
||||||
@@ -144,6 +152,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
uid = gconv.Int64(result[2])
|
uid = gconv.Int64(result[2])
|
||||||
//uid, err = strconv.ParseInt(result[2], 10, 64)
|
//uid, err = strconv.ParseInt(result[2], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,7 +181,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
ActId: actId,
|
ActId: actId,
|
||||||
}).Fields("uid,act_id").Scan(&data)
|
}).Fields("uid,act_id").Scan(&data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Debugf(ctx, "当前数据错误: %v", cacheKey)
|
g.Log().Errorf(ctx, "当前数据错误: %v", cacheKey)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
actionData := cacheGet.String()
|
actionData := cacheGet.String()
|
||||||
@@ -189,55 +198,64 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
data.Action = actionData
|
data.Action = actionData
|
||||||
update = append(update, data)
|
update = append(update, data)
|
||||||
}
|
}
|
||||||
//最后删除key
|
|
||||||
delKey = append(delKey, cacheKey)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//批量写入数据库
|
//批量写入数据库
|
||||||
if len(delKey) > 0 {
|
updateCount := 0
|
||||||
|
|
||||||
|
g.Log().Debugf(ctx, "当前 %v 要更新的数据: %v 条", actId, len(update))
|
||||||
|
if len(update) > 100 {
|
||||||
for _, v := range update {
|
for _, v := range update {
|
||||||
v.UpdatedAt = gtime.Now()
|
v.UpdatedAt = gtime.Now()
|
||||||
_, err2 := g.Model(Name).Where(do.GameAct{
|
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
||||||
Uid: v.Uid,
|
Uid: v.Uid,
|
||||||
ActId: v.ActId,
|
ActId: v.ActId,
|
||||||
UpdatedAt: v.UpdatedAt,
|
|
||||||
}).Data(v).Update()
|
}).Data(v).Update()
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Error(ctx, err2)
|
g.Log().Error(ctx, err2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
////获取多少个数据,删除不是当前修改的数据
|
if row, _ := updateRes.RowsAffected(); row == 0 {
|
||||||
//count, _ := g.Model(Name).Where(do.GameAct{
|
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||||
// Uid: v.Uid,
|
continue
|
||||||
// ActId: v.ActId,
|
|
||||||
//}).Count()
|
|
||||||
//if count > 1 {
|
|
||||||
// g.Model(Name).Where(do.GameAct{
|
|
||||||
// Uid: v.Uid,
|
|
||||||
// ActId: v.ActId,
|
|
||||||
// }).WhereNot("updated_at", v.UpdatedAt).Delete()
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
//dbRes, err2 := g.Model(Name).Batch(50).Data(add).Update()
|
|
||||||
|
//删除缓存
|
||||||
|
go s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
|
updateCount++
|
||||||
update = make([]*entity.GameAct, 0)
|
update = make([]*entity.GameAct, 0)
|
||||||
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save()
|
}
|
||||||
|
g.Log().Debugf(ctx, "当前 %v 更新数据库: %v 条", actId, updateCount)
|
||||||
|
update = make([]*entity.GameAct, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int64
|
||||||
|
g.Log().Debugf(ctx, "当前 %v 要添加的数据: %v 条", actId, len(add))
|
||||||
|
if len(add) > 100 {
|
||||||
|
dbRes, err2 := g.Model(Name).Data(add).Save()
|
||||||
|
|
||||||
|
err = err2
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err2)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
count, _ = dbRes.RowsAffected()
|
||||||
|
if count == 0 {
|
||||||
|
g.Log().Error(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||||
|
for _, vTemp := range add {
|
||||||
|
g.Log().Debugf(ctx, "当前act:%v,add写入数据: %v,内容:%v", vTemp.ActId, vTemp.Uid, vTemp.Action)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v2 := range add {
|
||||||
|
//删除缓存
|
||||||
|
go s.DelCacheKey(ctx, v2.ActId, v2.Uid)
|
||||||
|
}
|
||||||
|
|
||||||
|
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||||
add = make([]*entity.GameAct, 0)
|
add = make([]*entity.GameAct, 0)
|
||||||
if err2 != nil {
|
|
||||||
g.Log().Error(ctx, err2)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, v := range delKey {
|
|
||||||
_, err2 = g.Redis().Del(ctx, v)
|
|
||||||
if err2 != nil {
|
|
||||||
g.Log().Error(ctx, err2)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delKey = make([]string, 0)
|
|
||||||
|
|
||||||
count, _ := dbRes.RowsAffected()
|
|
||||||
g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -250,17 +268,27 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 删除缓存key
|
||||||
|
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
|
||||||
|
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
|
||||||
|
_, err := g.Redis().Del(ctx, cacheKey)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 清空GetRedDot缓存
|
// 清空GetRedDot缓存
|
||||||
func (s *sGameAct) RefreshGetRedDotCache(uid int64) {
|
func (s *sGameAct) RefreshGetRedDotCache(uid int64) {
|
||||||
cacheKey := fmt.Sprintf("gameAct:GetRedDot:%s:%d", gtime.Now().Format("d"), uid)
|
cacheKey := fmt.Sprintf("gameAct:GetRedDot:%s:%d", gtime.Now().Format("d"), uid)
|
||||||
_, err := pkg.Cache("redis").Remove(gctx.New(), cacheKey)
|
_, err := pkg.Cache("redis").Remove(gctx.New(), cacheKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Error(ctx, err)
|
g.Log().Error(gctx.New(), err)
|
||||||
g.Dump(err)
|
g.Dump(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *sGameAct) Del(uid int64, actId int) {
|
func (s *sGameAct) Del(uid int64, actId int) {
|
||||||
|
var ctx = gctx.New()
|
||||||
if uid == 0 || actId == 0 {
|
if uid == 0 || actId == 0 {
|
||||||
g.Log().Error(ctx, "当前参数为空")
|
g.Log().Error(ctx, "当前参数为空")
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -1,22 +1,25 @@
|
|||||||
package gameKv
|
package gameKv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/pkg"
|
"github.com/ayflying/utility_go/pkg"
|
||||||
"github.com/ayflying/utility_go/service"
|
"github.com/ayflying/utility_go/service"
|
||||||
"github.com/ayflying/utility_go/tools"
|
"github.com/ayflying/utility_go/tools"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ctx = gctx.New()
|
|
||||||
Name = "game_kv"
|
Name = "game_kv"
|
||||||
|
RunTimeMax *gtime.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
type sGameKv struct {
|
type sGameKv struct {
|
||||||
@@ -37,17 +40,11 @@ func init() {
|
|||||||
// @receiver s: sGameKv的实例。
|
// @receiver s: sGameKv的实例。
|
||||||
// @return err: 错误信息,如果操作成功,则为nil。
|
// @return err: 错误信息,如果操作成功,则为nil。
|
||||||
func (s *sGameKv) SavesV1() (err error) {
|
func (s *sGameKv) SavesV1() (err error) {
|
||||||
getCache, err := pkg.Cache("redis").Get(nil, "cron:game_kv")
|
var ctx = gctx.New()
|
||||||
//如果没有执行过,设置时间戳
|
// 最大允许执行时间
|
||||||
if getCache.Int64() > 0 {
|
RunTimeMax = gtime.Now().Add(time.Minute * 30)
|
||||||
return
|
g.Log().Debug(ctx, "开始执行游戏kv数据保存")
|
||||||
} else {
|
|
||||||
pkg.Cache("redis").Set(nil, "cron:game_kv", gtime.Now().Unix(), time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 从Redis列表中获取所有用户KV索引的键
|
|
||||||
//keys, err := utils.RedisScan("user:kv:*")
|
|
||||||
err = tools.Redis.RedisScanV2("user:kv:*", func(keys []string) (err error) {
|
|
||||||
// 定义用于存储用户数据的结构体
|
// 定义用于存储用户数据的结构体
|
||||||
type ListData struct {
|
type ListData struct {
|
||||||
Uid int64 `json:"uid"`
|
Uid int64 `json:"uid"`
|
||||||
@@ -56,8 +53,19 @@ func (s *sGameKv) SavesV1() (err error) {
|
|||||||
var list []*ListData
|
var list []*ListData
|
||||||
// 初始化列表,长度与keys数组一致
|
// 初始化列表,长度与keys数组一致
|
||||||
list = make([]*ListData, 0)
|
list = make([]*ListData, 0)
|
||||||
|
|
||||||
|
// 从Redis列表中获取所有用户KV索引的键
|
||||||
|
//keys, err := utils.RedisScan("user:kv:*")
|
||||||
|
err = tools.Redis.RedisScanV2("user:kv:*", func(keys []string) (err error) {
|
||||||
|
//判断是否超时
|
||||||
|
if gtime.Now().After(RunTimeMax) {
|
||||||
|
g.Log().Error(ctx, "kv执行超时了,停止执行!")
|
||||||
|
err = errors.New("kv执行超时了,停止执行!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
//需要删除的key
|
//需要删除的key
|
||||||
var delKey []string
|
|
||||||
// 遍历keys,获取每个用户的数据并填充到list中
|
// 遍历keys,获取每个用户的数据并填充到list中
|
||||||
for _, cacheKey := range keys {
|
for _, cacheKey := range keys {
|
||||||
//g.Log().Infof(ctx, "保存用户kv数据%v", v)
|
//g.Log().Infof(ctx, "保存用户kv数据%v", v)
|
||||||
@@ -90,30 +98,21 @@ func (s *sGameKv) SavesV1() (err error) {
|
|||||||
Uid: uid,
|
Uid: uid,
|
||||||
Kv: data,
|
Kv: data,
|
||||||
})
|
})
|
||||||
|
|
||||||
delKey = append(delKey, cacheKey)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将列表数据保存到数据库
|
// 将列表数据保存到数据库
|
||||||
if len(list) > 0 {
|
if len(list) > 100 {
|
||||||
_, err2 := g.Model("game_kv").Batch(30).Data(list).Save()
|
_, err2 := g.Model("game_kv").Data(list).Save()
|
||||||
list = make([]*ListData, 0)
|
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Error(ctx, err2)
|
g.Log().Error(ctx, err2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//删除当前key
|
||||||
//批量删除key
|
for _, v := range list {
|
||||||
for _, v := range delKey {
|
go s.DelCacheKey(ctx, v.Uid)
|
||||||
_, err2 = g.Redis().Del(ctx, v)
|
|
||||||
if err2 != nil {
|
|
||||||
g.Log().Errorf(ctx, "删除存档错误:%v,err=%v", v, err2)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
list = make([]*ListData, 0)
|
||||||
|
|
||||||
delKey = make([]string, 0)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Error(ctx, "当前kv数据入库失败: %v", err)
|
g.Log().Error(ctx, "当前kv数据入库失败: %v", err)
|
||||||
@@ -122,17 +121,14 @@ func (s *sGameKv) SavesV1() (err error) {
|
|||||||
return
|
return
|
||||||
})
|
})
|
||||||
|
|
||||||
//if err != nil {
|
|
||||||
// return err
|
|
||||||
//}
|
|
||||||
////跳过
|
|
||||||
//if len(keys) == 0 {
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
////一次最多处理10w条
|
|
||||||
//if len(keys) > 10000 {
|
|
||||||
// keys = keys[:10000]
|
|
||||||
//}
|
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 删除缓存key
|
||||||
|
func (s *sGameKv) DelCacheKey(ctx context.Context, uid int64) {
|
||||||
|
cacheKey := fmt.Sprintf("user:kv:%v", uid)
|
||||||
|
_, err := g.Redis().Del(ctx, cacheKey)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,6 +2,9 @@ package systemCron
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/api/system/v1"
|
"github.com/ayflying/utility_go/api/system/v1"
|
||||||
"github.com/ayflying/utility_go/service"
|
"github.com/ayflying/utility_go/service"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
@@ -9,8 +12,6 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"github.com/gogf/gf/v2/os/gtimer"
|
"github.com/gogf/gf/v2/os/gtimer"
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -59,7 +60,7 @@ type sSystemCron struct {
|
|||||||
func New() *sSystemCron {
|
func New() *sSystemCron {
|
||||||
return &sSystemCron{
|
return &sSystemCron{
|
||||||
taskChan: make(chan func(context.Context) error, 2),
|
taskChan: make(chan func(context.Context) error, 2),
|
||||||
TaskTimeout: time.Minute * 30,
|
TaskTimeout: time.Minute * 60,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,7 +81,8 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
|
|||||||
var _func2 = func(ctx context.Context) error {
|
var _func2 = func(ctx context.Context) error {
|
||||||
return _func()
|
return _func()
|
||||||
}
|
}
|
||||||
s.AddCronV2(typ, _func2)
|
// 老版本计划任务全都是主服务器唯一执行
|
||||||
|
s.AddCronV2(typ, _func2, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddCronV2 添加一个定时任务到相应的调度列表中。
|
// AddCronV2 添加一个定时任务到相应的调度列表中。
|
||||||
@@ -89,7 +91,16 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
|
|||||||
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
||||||
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
||||||
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
||||||
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error) {
|
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行,false 全局执行不判断唯一
|
||||||
|
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool) {
|
||||||
|
//如果传过来的任务是需要主服务器执行一次
|
||||||
|
if len(_onlyMain) > 0 && _onlyMain[0] {
|
||||||
|
//判断当前是否为主服务器
|
||||||
|
if !g.Cfg().MustGet(gctx.New(), "game.cron_main").Bool() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//加锁
|
//加锁
|
||||||
s.Lock.Lock()
|
s.Lock.Lock()
|
||||||
defer s.Lock.Unlock()
|
defer s.Lock.Unlock()
|
||||||
@@ -322,7 +333,7 @@ func (s *sSystemCron) RunFuncChan() {
|
|||||||
//ctx := gctx.New()
|
//ctx := gctx.New()
|
||||||
func() {
|
func() {
|
||||||
//超时释放资源
|
//超时释放资源
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout)
|
ctx, cancel := context.WithTimeout(gctx.New(), s.TaskTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// 使用匿名函数包裹来捕获 panic
|
// 使用匿名函数包裹来捕获 panic
|
||||||
|
|||||||
399
package/gamelog/sdk.go
Normal file
399
package/gamelog/sdk.go
Normal file
@@ -0,0 +1,399 @@
|
|||||||
|
package gamelog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
|
"github.com/gogf/gf/v2/net/gclient"
|
||||||
|
"github.com/gogf/gf/v2/os/gfile"
|
||||||
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sendBody struct {
|
||||||
|
Pid string `json:"pid"`
|
||||||
|
Data [][]any `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo 游戏日志对象
|
||||||
|
type GameLog struct {
|
||||||
|
Uid string // 唯一uid
|
||||||
|
Event string // 事件名
|
||||||
|
Property map[string]any // 事件属性
|
||||||
|
EventTimems int64 // 时间戳毫秒级别
|
||||||
|
EventTimeLoc string // 带时区的本地时间字符串
|
||||||
|
}
|
||||||
|
|
||||||
|
type SDKConfig struct {
|
||||||
|
// 配置变量
|
||||||
|
Pid string // 项目id
|
||||||
|
BaseUrl string // 日志服务器地址
|
||||||
|
ReportSk string // 上报解密key
|
||||||
|
FlushInterval int // 刷新间隔
|
||||||
|
DiskBakPath string // 磁盘备份路径
|
||||||
|
RetryN int // 每N次重试
|
||||||
|
ChanSize int // 信道大小, 默认1000
|
||||||
|
|
||||||
|
reportN int
|
||||||
|
}
|
||||||
|
|
||||||
|
type SDK struct {
|
||||||
|
// 控制变量
|
||||||
|
wg sync.WaitGroup
|
||||||
|
shutdown chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
sdkConfig *SDKConfig
|
||||||
|
bufferChan chan GameLog // 日志队列
|
||||||
|
buffer []GameLog // 日志队列
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx = context.Background()
|
||||||
|
gamelogClient *gclient.Client
|
||||||
|
|
||||||
|
// location map
|
||||||
|
// locationMap map[string]*time.Location = map[string]*time.Location{}
|
||||||
|
locationMap sync.Map // 声明一个线程安全的Map
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
func getLocationMapValue(key string) *time.Location {
|
||||||
|
// 1. 先尝试读
|
||||||
|
value, loaded := locationMap.Load(key)
|
||||||
|
if loaded {
|
||||||
|
return value.(*time.Location) // 如果已经存在,直接返回
|
||||||
|
}
|
||||||
|
// 2. 不存在,就初始化一个该key对应的**固定的**新值
|
||||||
|
location, err := time.LoadLocation(key)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]load location error, try use local timezone: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// 3. 核心:原子性地存储,如果key已存在则返回已存在的值
|
||||||
|
actualValue, loaded := locationMap.LoadOrStore(key, location)
|
||||||
|
if loaded {
|
||||||
|
// 如果loaded为true,说明其他goroutine抢先存了
|
||||||
|
// 我们可以丢弃刚创建的newValue(如果有需要的话),返回已存在的actualValue
|
||||||
|
return actualValue.(*time.Location)
|
||||||
|
}
|
||||||
|
// 如果loaded为false,说明是我们存成功的,返回我们刚创建的newValue
|
||||||
|
return actualValue.(*time.Location)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdk *SDK) varinit() error {
|
||||||
|
sdk.sdkConfig = &SDKConfig{}
|
||||||
|
|
||||||
|
_pid, err := g.Config().Get(ctx, "angergs.bisdk.pid")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.Pid = _pid.String()
|
||||||
|
|
||||||
|
_baseUrl, err := g.Config().Get(ctx, "angergs.bisdk.recodeServerBaseUrl")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.BaseUrl = _baseUrl.String()
|
||||||
|
|
||||||
|
_sk, err := g.Config().Get(ctx, "angergs.bisdk.reportSk")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.ReportSk = _sk.String()
|
||||||
|
|
||||||
|
_flushInterval, err := g.Config().Get(ctx, "angergs.bisdk.flushInterval")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.FlushInterval = _flushInterval.Int()
|
||||||
|
|
||||||
|
_diskBakPath, err := g.Config().Get(ctx, "angergs.bisdk.diskBakPath")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.DiskBakPath = _diskBakPath.String()
|
||||||
|
|
||||||
|
_retryN, err := g.Config().Get(ctx, "angergs.bisdk.retryN")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.RetryN = _retryN.Int()
|
||||||
|
|
||||||
|
_chanSize, err := g.Config().Get(ctx, "angergs.bisdk.chanSize")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.ChanSize = _chanSize.Int()
|
||||||
|
|
||||||
|
g.Log().Infof(ctx, "[GameLog]client init success, config: %v", sdk.sdkConfig)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdk *SDK) checkConfig() error {
|
||||||
|
config := sdk.sdkConfig
|
||||||
|
if config.Pid == "" {
|
||||||
|
return fmt.Errorf("pid is empty")
|
||||||
|
}
|
||||||
|
if config.BaseUrl == "" {
|
||||||
|
return fmt.Errorf("baseUrl is empty")
|
||||||
|
}
|
||||||
|
if config.ReportSk == "" {
|
||||||
|
return fmt.Errorf("reportSk is empty")
|
||||||
|
}
|
||||||
|
if config.FlushInterval <= 0 {
|
||||||
|
return fmt.Errorf("flushInterval is invalid")
|
||||||
|
}
|
||||||
|
if config.DiskBakPath == "" {
|
||||||
|
return fmt.Errorf("diskBakPath is empty")
|
||||||
|
}
|
||||||
|
if config.RetryN == 0 {
|
||||||
|
config.RetryN = 10
|
||||||
|
}
|
||||||
|
if config.ChanSize == 0 {
|
||||||
|
config.ChanSize = 1000
|
||||||
|
}
|
||||||
|
config.DiskBakPath = strings.TrimSuffix(config.DiskBakPath, "/")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func INIT(config *SDKConfig) (*SDK, error) {
|
||||||
|
// 加载并检查配置
|
||||||
|
sdk := &SDK{}
|
||||||
|
if config != nil {
|
||||||
|
sdk.sdkConfig = config
|
||||||
|
} else if err := sdk.varinit(); err != nil { // 可以读goframe的配置
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := sdk.checkConfig(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
gamelogClient = g.Client()
|
||||||
|
|
||||||
|
// 初始化队列
|
||||||
|
sdk.shutdown = make(chan struct{})
|
||||||
|
sdk.bufferChan = make(chan GameLog, 1000)
|
||||||
|
sdk.buffer = make([]GameLog, 0, 100)
|
||||||
|
// 加载失败日志
|
||||||
|
failLogs, err := sdk.loadFailLogs4disk()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||||
|
} else if len(failLogs) > 0 {
|
||||||
|
sdk.buffer = append(sdk.buffer, failLogs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 开启协程进行日志发送
|
||||||
|
sdk.wg = sync.WaitGroup{}
|
||||||
|
sdk.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer sdk.wg.Done()
|
||||||
|
ticker := time.NewTicker(time.Duration(sdk.sdkConfig.FlushInterval) * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-sdk.shutdown:
|
||||||
|
// 关闭时, 上传一次并备份失败数据
|
||||||
|
g.Log().Infof(ctx, "[GameLog]begin shutdown and flush last")
|
||||||
|
sdk.flush()
|
||||||
|
return
|
||||||
|
case log := <-sdk.bufferChan:
|
||||||
|
sdk.buffer = append(sdk.buffer, log)
|
||||||
|
case <-ticker.C:
|
||||||
|
sdk.flush()
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return sdk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 从磁盘加载失败日志
|
||||||
|
func (sdk *SDK) loadFailLogs4disk() (logs []GameLog, err error) {
|
||||||
|
if !gfile.Exists(sdk.sdkConfig.DiskBakPath) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 遍历diskBakPath下所有failBufferxxx.bak.log文件, 读取到log中
|
||||||
|
files, err := gfile.ScanDir(sdk.sdkConfig.DiskBakPath, "failBuffer*.bak.log")
|
||||||
|
logs = []GameLog{}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 读取每个备份文件
|
||||||
|
for _, fp := range files {
|
||||||
|
// 每一行都是一次失败的记录
|
||||||
|
gfile.ReadLines(fp, func(line string) error {
|
||||||
|
_logs := []GameLog{}
|
||||||
|
err := json.Unmarshal([]byte(line), &_logs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// 合并到总日志列表
|
||||||
|
logs = append(logs, _logs...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
g.Log().Infof(ctx, "[GameLog]load %d faillogs from %s", len(logs), fp)
|
||||||
|
gfile.Remove(fp)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 备份失败日志追加到磁盘
|
||||||
|
func (sdk *SDK) bakFailLogs2disk(failLogs []GameLog) {
|
||||||
|
bakPath := fmt.Sprintf("%s/failBuffer%s.bak.log", sdk.sdkConfig.DiskBakPath, gtime.Now().Format("YmdH"))
|
||||||
|
content, err := json.Marshal(failLogs)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]marshal fail logs error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gfile.PutContentsAppend(bakPath, string(content)+"\n")
|
||||||
|
g.Log().Infof(ctx, "[GameLog]backup fail buffer to %s", bakPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 优雅关闭
|
||||||
|
func (sdk *SDK) Shutdown() {
|
||||||
|
close(sdk.shutdown)
|
||||||
|
sdk.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 日志时间格式
|
||||||
|
const datetimeFmt = time.DateOnly + " " + time.TimeOnly
|
||||||
|
|
||||||
|
// 记录日志
|
||||||
|
func (sdk *SDK) Log(uid, event string, property map[string]any, timezone string) {
|
||||||
|
loc := time.Local
|
||||||
|
if _loc := getLocationMapValue(timezone); _loc != nil {
|
||||||
|
loc = _loc
|
||||||
|
}
|
||||||
|
log := GameLog{
|
||||||
|
Uid: uid,
|
||||||
|
Event: event,
|
||||||
|
Property: property,
|
||||||
|
EventTimems: gtime.Now().TimestampMilli(),
|
||||||
|
EventTimeLoc: gtime.Now().In(loc).Format(datetimeFmt),
|
||||||
|
}
|
||||||
|
// 线程安全
|
||||||
|
sdk.bufferChan <- log
|
||||||
|
}
|
||||||
|
|
||||||
|
// 按服务器时区记录日志
|
||||||
|
func (sdk *SDK) LogLtz(uid, event string, property map[string]any) {
|
||||||
|
sdk.Log(uid, event, property, time.Local.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 这个方法只会在内部协程调用
|
||||||
|
func (sdk *SDK) flush() {
|
||||||
|
sdk.mu.Lock()
|
||||||
|
defer sdk.mu.Unlock()
|
||||||
|
if len(sdk.buffer) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := make([]GameLog, len(sdk.buffer))
|
||||||
|
copy(batch, sdk.buffer)
|
||||||
|
sdk.buffer = sdk.buffer[:0]
|
||||||
|
|
||||||
|
// 第N次的时候加载失败数据进行尝试
|
||||||
|
if sdk.sdkConfig.reportN != 0 && sdk.sdkConfig.reportN%sdk.sdkConfig.RetryN == 0 {
|
||||||
|
faillogs, err := sdk.loadFailLogs4disk()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||||
|
}
|
||||||
|
// 如果有失败日志则加入到批量数组中
|
||||||
|
if len(faillogs) > 0 {
|
||||||
|
batch = append(batch, faillogs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sdk.send(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送消息
|
||||||
|
func (sdk *SDK) send(logs []GameLog) {
|
||||||
|
waitSecond := time.Duration(sdk.sdkConfig.FlushInterval/4) * time.Second
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(context.Background(), waitSecond)
|
||||||
|
defer cancel()
|
||||||
|
data := make([][]any, 0, len(logs))
|
||||||
|
// logs 拆分成二维数组
|
||||||
|
for _, log := range logs {
|
||||||
|
propertyJson, err := json.Marshal(log.Property)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]skip log parse, marshal property error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
data = append(data, []any{
|
||||||
|
log.Uid,
|
||||||
|
log.Event,
|
||||||
|
string(propertyJson),
|
||||||
|
log.EventTimems,
|
||||||
|
log.EventTimeLoc,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// json化
|
||||||
|
sbody := sendBody{
|
||||||
|
Pid: sdk.sdkConfig.Pid,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
jsonBody, err := json.Marshal(sbody)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]marshal send body error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// giz压缩
|
||||||
|
gzBody := bytes.NewBuffer([]byte{})
|
||||||
|
gz := gzip.NewWriter(gzBody)
|
||||||
|
gz.Write(jsonBody)
|
||||||
|
gz.Close()
|
||||||
|
|
||||||
|
// XOR 加密
|
||||||
|
xorBody := bytesXOR(gzBody.Bytes(), []byte(sdk.sdkConfig.ReportSk))
|
||||||
|
|
||||||
|
sdk.sdkConfig.reportN += 1
|
||||||
|
res, err := gamelogClient.Post(timeoutCtx, sdk.sdkConfig.BaseUrl+"/report/event", xorBody)
|
||||||
|
// 失败重新加入缓冲区
|
||||||
|
if err != nil {
|
||||||
|
sdk.bakFailLogs2disk(logs)
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cerr := res.Close()
|
||||||
|
if cerr != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]close response error: %v", cerr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
httpcode := res.StatusCode
|
||||||
|
resBody := res.ReadAllString()
|
||||||
|
// 收集器拦截, 重新加入缓冲区
|
||||||
|
if httpcode != http.StatusOK {
|
||||||
|
sdk.bakFailLogs2disk(logs)
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), resBody)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 混淆
|
||||||
|
func bytesXOR(data []byte, key []byte) []byte {
|
||||||
|
obfuscated := make([]byte, len(data))
|
||||||
|
keyLen := len(key)
|
||||||
|
if keyLen == 0 {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range data {
|
||||||
|
obfuscated[i] = data[i] ^ key[i%keyLen]
|
||||||
|
}
|
||||||
|
return obfuscated
|
||||||
|
|
||||||
|
// // 使用示例
|
||||||
|
// key := []byte{0x12, 0x34, 0x56, 0x78}
|
||||||
|
// obfuscated := multiXorObfuscate(original, key)
|
||||||
|
// deobfuscated := multiXorObfuscate(obfuscated, key) // 解密
|
||||||
|
}
|
||||||
52
package/gamelog/test/gamelog_test.go
Normal file
52
package/gamelog/test/gamelog_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ayflying/utility_go/package/gamelog"
|
||||||
|
"github.com/gogf/gf/v2/test/gtest"
|
||||||
|
"github.com/gogf/gf/v2/util/grand"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGamelog(t *testing.T) {
|
||||||
|
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
|
||||||
|
// 必填
|
||||||
|
Pid: "test5", // 项目ID
|
||||||
|
BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
|
||||||
|
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
|
||||||
|
ReportSk: "sngame2025", // xor混淆key
|
||||||
|
FlushInterval: 5, // 上报间隔
|
||||||
|
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
|
||||||
|
// 可填
|
||||||
|
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
|
||||||
|
ChanSize: 500, // 默认1000, 信道size
|
||||||
|
})
|
||||||
|
|
||||||
|
// 随机测试事件和属性
|
||||||
|
events := []string{"e1", "e2", "e3", "e4"}
|
||||||
|
pms := []map[string]any{
|
||||||
|
{"a": "1"},
|
||||||
|
{"a": "2"},
|
||||||
|
{"a": "3"},
|
||||||
|
{"a": "4"},
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
gtest.C(t, func(t *gtest.T) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
uuidval, _ := uuid.NewUUID()
|
||||||
|
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
|
||||||
|
glsdk.LogLtz(randUid, events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Second * 14)
|
||||||
|
// 模拟等待信号后优雅关闭
|
||||||
|
glsdk.Shutdown()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,6 +1,9 @@
|
|||||||
package aycache
|
package aycache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
|
||||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||||
"github.com/ayflying/utility_go/internal/boot"
|
"github.com/ayflying/utility_go/internal/boot"
|
||||||
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
||||||
@@ -9,7 +12,6 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gcache"
|
"github.com/gogf/gf/v2/os/gcache"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"math"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
||||||
@@ -19,6 +21,7 @@ type Mod struct {
|
|||||||
|
|
||||||
// QPSCount 记录缓存的 QPS 计数
|
// QPSCount 记录缓存的 QPS 计数
|
||||||
var QPSCount int
|
var QPSCount int
|
||||||
|
|
||||||
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
||||||
var QPS = promauto.NewGauge(
|
var QPS = promauto.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
@@ -31,7 +34,7 @@ var QPS = promauto.NewGauge(
|
|||||||
func init() {
|
func init() {
|
||||||
boot.AddFunc(func() {
|
boot.AddFunc(func() {
|
||||||
// 初始化指标,每分钟计算一次平均 QPS 并重置计数器
|
// 初始化指标,每分钟计算一次平均 QPS 并重置计数器
|
||||||
service.SystemCron().AddCron(v1.CronType_MINUTE, func() error {
|
service.SystemCron().AddCronV2(v1.CronType_MINUTE, func(context.Context) error {
|
||||||
QPS.Set(math.Round(float64(QPSCount) / 60))
|
QPS.Set(math.Round(float64(QPSCount) / 60))
|
||||||
QPSCount = 0
|
QPSCount = 0
|
||||||
return nil
|
return nil
|
||||||
@@ -53,8 +56,13 @@ func New(_name ...string) gcache.Adapter {
|
|||||||
// 创建内存缓存适配器
|
// 创建内存缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterMemory()
|
cacheAdapterObj = drive2.NewAdapterMemory()
|
||||||
case "redis":
|
case "redis":
|
||||||
|
//第二个参数为配置名称,默认为default
|
||||||
|
var typ = "default"
|
||||||
|
if len(_name) >= 2 {
|
||||||
|
typ = _name[1]
|
||||||
|
}
|
||||||
// 创建 Redis 缓存适配器
|
// 创建 Redis 缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterRedis()
|
cacheAdapterObj = drive2.NewAdapterRedis(typ)
|
||||||
case "file":
|
case "file":
|
||||||
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
||||||
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
||||||
|
|||||||
@@ -7,19 +7,22 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var adapterRedisClient gcache.Adapter
|
var adapterRedisClient = make(map[string]gcache.Adapter)
|
||||||
var adapterRedisCache = gcache.New()
|
var adapterRedisCache = make(map[string]*gcache.Cache)
|
||||||
|
|
||||||
func NewAdapterRedis() gcache.Adapter {
|
func NewAdapterRedis(name string) gcache.Adapter {
|
||||||
|
if adapterRedisClient[name] == nil {
|
||||||
if adapterRedisClient == nil {
|
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
|
||||||
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
|
if err != nil {
|
||||||
|
panic("当前redis配置不存在")
|
||||||
|
}
|
||||||
var cfg *gredis.Config
|
var cfg *gredis.Config
|
||||||
_cfg.Scan(&cfg)
|
_cfg.Scan(&cfg)
|
||||||
redisObj, _ := gredis.New(cfg)
|
redisObj, _ := gredis.New(cfg)
|
||||||
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
|
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
|
||||||
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
|
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
|
||||||
adapterRedisCache.SetAdapter(adapterRedisClient)
|
adapterRedisCache[name] = gcache.New()
|
||||||
|
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
|
||||||
}
|
}
|
||||||
return adapterRedisCache
|
return adapterRedisCache[name]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package elasticsearch
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||||
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
|||||||
save = append(save, v)
|
save = append(save, v)
|
||||||
}
|
}
|
||||||
//save = data
|
//save = data
|
||||||
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||||
|
if err2 != nil {
|
||||||
|
err = err2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//需要接收返回信息,判断是否全部执行成功
|
||||||
|
if response.Errors { //未全部完成
|
||||||
|
//是否需要删除已成功导入的部分数据
|
||||||
|
for _, item := range response.Items {
|
||||||
|
for _, v := range item {
|
||||||
|
if v.Error != nil { //失败
|
||||||
|
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
//删除已导入成功的数据
|
||||||
|
_, err = s.Delete(ctx, *v.Id_)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("部分数据导入失败")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ type DataType struct {
|
|||||||
Url string `json:"url"` // S3 服务的访问 URL
|
Url string `json:"url"` // S3 服务的访问 URL
|
||||||
BucketName string `json:"bucket_name"` // 默认存储桶名称
|
BucketName string `json:"bucket_name"` // 默认存储桶名称
|
||||||
BucketNameCdn string `json:"bucket_name_cdn"` // CDN 存储桶名称
|
BucketNameCdn string `json:"bucket_name_cdn"` // CDN 存储桶名称
|
||||||
Provider string `json:"provider"` // S3 服务的提供方
|
//Provider string `json:"provider"` // S3 服务的提供方
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mod 定义了 S3 模块的结构体,包含一个 S3 客户端实例和配置信息
|
// Mod 定义了 S3 模块的结构体,包含一个 S3 客户端实例和配置信息
|
||||||
@@ -184,6 +184,12 @@ func (s *Mod) ListObjects(bucketName string, prefix string) (res <-chan minio.Ob
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StatObject 获取指定存储桶中指定文件的元数据信息
|
||||||
|
func (s *Mod) StatObject(bucketName string, objectName string) (res minio.ObjectInfo, err error) {
|
||||||
|
res, err = s.client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// SetBucketPolicy 设置指定存储桶或对象前缀的访问策略
|
// SetBucketPolicy 设置指定存储桶或对象前缀的访问策略
|
||||||
// 目前使用固定的策略,可根据需求修改
|
// 目前使用固定的策略,可根据需求修改
|
||||||
func (s *Mod) SetBucketPolicy(bucketName string, prefix string) (err error) {
|
func (s *Mod) SetBucketPolicy(bucketName string, prefix string) (err error) {
|
||||||
|
|||||||
@@ -31,8 +31,10 @@ type (
|
|||||||
// @param data interface{}: 要存储的活动信息数据。
|
// @param data interface{}: 要存储的活动信息数据。
|
||||||
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
// @return err error: 返回错误信息,如果操作成功,则返回nil。
|
||||||
Set(uid int64, actId int, data interface{}) (err error)
|
Set(uid int64, actId int, data interface{}) (err error)
|
||||||
Saves(ctx context.Context) (err error)
|
Saves() (err error)
|
||||||
Save(ctx context.Context, actId int) (err error)
|
Save(ctx context.Context, actId int) (err error)
|
||||||
|
// 删除缓存key
|
||||||
|
DelCacheKey(ctx context.Context, aid int, uid int64)
|
||||||
// 清空GetRedDot缓存
|
// 清空GetRedDot缓存
|
||||||
RefreshGetRedDotCache(uid int64)
|
RefreshGetRedDotCache(uid int64)
|
||||||
Del(uid int64, actId int)
|
Del(uid int64, actId int)
|
||||||
|
|||||||
@@ -5,6 +5,10 @@
|
|||||||
|
|
||||||
package service
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
IGameKv interface {
|
IGameKv interface {
|
||||||
// SavesV1 方法
|
// SavesV1 方法
|
||||||
@@ -13,6 +17,8 @@ type (
|
|||||||
// @receiver s: sGameKv的实例。
|
// @receiver s: sGameKv的实例。
|
||||||
// @return err: 错误信息,如果操作成功,则为nil。
|
// @return err: 错误信息,如果操作成功,则为nil。
|
||||||
SavesV1() (err error)
|
SavesV1() (err error)
|
||||||
|
// 删除缓存key
|
||||||
|
DelCacheKey(ctx context.Context, uid int64)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -7,9 +7,6 @@ package service
|
|||||||
|
|
||||||
type (
|
type (
|
||||||
IIp2Region interface {
|
IIp2Region interface {
|
||||||
// Load 加载到内存中
|
|
||||||
//
|
|
||||||
// @Description: 加载ip2region数据库到内存中。
|
|
||||||
// @receiver s *sIp2region: sIp2region的实例。
|
// @receiver s *sIp2region: sIp2region的实例。
|
||||||
Load()
|
Load()
|
||||||
GetIp(ip string) (res []string)
|
GetIp(ip string) (res []string)
|
||||||
|
|||||||
@@ -38,7 +38,8 @@ type (
|
|||||||
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
// @receiver s: sSystemCron的实例,代表一个调度系统。
|
||||||
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
|
||||||
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。
|
||||||
AddCronV2(typ v1.CronType, _func func(context.Context) error)
|
// @param _onlyMain: 是否只在主服务器上执行一次,true 唯一执行,false 全局执行不判断唯一
|
||||||
|
AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool)
|
||||||
// StartCron 开始计划任务执行
|
// StartCron 开始计划任务执行
|
||||||
//
|
//
|
||||||
// @Description:
|
// @Description:
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ func (r *redis) RedisScan(cacheKey string, _key ...string) (keys []string, err e
|
|||||||
|
|
||||||
// redis 批量获取大量数据
|
// redis 批量获取大量数据
|
||||||
func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ...string) error {
|
func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ...string) error {
|
||||||
|
|
||||||
//var keys []string
|
//var keys []string
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -67,9 +66,11 @@ func (r *redis) RedisScanV2(cacheKey string, _func func([]string) error, _key ..
|
|||||||
g.Log().Errorf(ctx, "Scan failed: %v", err)
|
g.Log().Errorf(ctx, "Scan failed: %v", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(newKeys) > 0 {
|
if len(newKeys) > 0 {
|
||||||
err = _func(newKeys)
|
err = _func(newKeys)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//这个要放在最后
|
//这个要放在最后
|
||||||
|
|||||||
@@ -1,11 +1,15 @@
|
|||||||
package utility_go
|
package utility_go
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/config"
|
"github.com/ayflying/utility_go/config"
|
||||||
"github.com/ayflying/utility_go/internal/boot"
|
"github.com/ayflying/utility_go/internal/boot"
|
||||||
_ "github.com/ayflying/utility_go/internal/logic"
|
_ "github.com/ayflying/utility_go/internal/logic"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
|
"github.com/gogf/gf/v2/os/gtimer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -14,9 +18,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
var err error
|
||||||
g.Log().Debug(ctx, "utility_go init启动完成")
|
g.Log().Debug(ctx, "utility_go init启动完成")
|
||||||
// 初始化配置
|
// 初始化配置
|
||||||
var err = boot.Boot()
|
gtimer.SetTimeout(ctx, time.Second*5, func(ctx context.Context) {
|
||||||
|
err = boot.Boot()
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|||||||
Reference in New Issue
Block a user