Compare commits

...

8 Commits

Author SHA1 Message Date
14cf759ce1 Merge branch 'fixmapsync' into 'master'
修一个map的并发

See merge request public_project/utility_go!2
2025-08-21 08:41:14 +00:00
19a19c1ff1 修一个map的并发 2025-08-21 16:39:47 +08:00
183d6d8b10 Merge branch 'bisdk' into 'master'
Bisdk

See merge request public_project/utility_go!1
2025-08-21 08:15:26 +00:00
d0cad61028 一些err接受问题 2025-08-21 16:09:10 +08:00
efb34e0c5b sdk和测试代码 2025-08-21 15:52:58 +08:00
liaoyulong
8190e9f6b7 批量导入elk后接收返回信息判断是否全部导入成功 2025-08-21 10:56:51 +08:00
ayflying
27435b57b7 修改缓存驱动,第二个参数支持选择不同的缓存配置 2025-08-21 10:30:43 +08:00
ayflying
0628882533 缓存key删除失败不报错 2025-08-20 15:13:01 +08:00
7 changed files with 506 additions and 21 deletions

View File

@@ -226,7 +226,6 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
_, err = g.Redis().Del(ctx, v)
if err != nil {
g.Log().Error(ctx, err)
return
}
}
delKey = make([]string, 0)

View File

@@ -2,16 +2,17 @@ package gameKv
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/ayflying/utility_go/pkg"
"github.com/ayflying/utility_go/service"
"github.com/ayflying/utility_go/tools"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
"strconv"
"strings"
"sync"
"time"
)
var (
@@ -107,8 +108,7 @@ func (s *sGameKv) SavesV1() (err error) {
for _, v := range delKey {
_, err2 = g.Redis().Del(ctx, v)
if err2 != nil {
g.Log().Errorf(ctx, "删除存档错误%v,err=%v", v, err2)
return
g.Log().Errorf(ctx, "删除存档失败%v,err=%v", v, err2)
}
}

399
package/gamelog/sdk.go Normal file
View File

@@ -0,0 +1,399 @@
package gamelog
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/gtime"
)
type sendBody struct {
Pid string `json:"pid"`
Data [][]any `json:"data"`
}
// todo 游戏日志对象
type GameLog struct {
Uid string // 唯一uid
Event string // 事件名
Property map[string]any // 事件属性
EventTimems int64 // 时间戳毫秒级别
EventTimeLoc string // 带时区的本地时间字符串
}
type SDKConfig struct {
// 配置变量
Pid string // 项目id
BaseUrl string // 日志服务器地址
ReportSk string // 上报解密key
FlushInterval int // 刷新间隔
DiskBakPath string // 磁盘备份路径
RetryN int // 每N次重试
ChanSize int // 信道大小, 默认1000
reportN int
}
type SDK struct {
// 控制变量
wg sync.WaitGroup
shutdown chan struct{}
mu sync.Mutex
sdkConfig *SDKConfig
bufferChan chan GameLog // 日志队列
buffer []GameLog // 日志队列
}
var (
ctx = context.Background()
gamelogClient *gclient.Client
// location map
// locationMap map[string]*time.Location = map[string]*time.Location{}
locationMap sync.Map // 声明一个线程安全的Map
)
func getLocationMapValue(key string) *time.Location {
// 1. 先尝试读
value, loaded := locationMap.Load(key)
if loaded {
return value.(*time.Location) // 如果已经存在,直接返回
}
// 2. 不存在就初始化一个该key对应的**固定的**新值
location, err := time.LoadLocation(key)
if err != nil {
g.Log().Warningf(ctx, "[GameLog]load location error, try use local timezone: %v", err)
return nil
}
// 3. 核心原子性地存储如果key已存在则返回已存在的值
actualValue, loaded := locationMap.LoadOrStore(key, location)
if loaded {
// 如果loaded为true说明其他goroutine抢先存了
// 我们可以丢弃刚创建的newValue如果有需要的话返回已存在的actualValue
return actualValue.(*time.Location)
}
// 如果loaded为false说明是我们存成功的返回我们刚创建的newValue
return actualValue.(*time.Location)
}
func (sdk *SDK) varinit() error {
sdk.sdkConfig = &SDKConfig{}
_pid, err := g.Config().Get(ctx, "angergs.bisdk.pid")
if err != nil {
return err
}
sdk.sdkConfig.Pid = _pid.String()
_baseUrl, err := g.Config().Get(ctx, "angergs.bisdk.recodeServerBaseUrl")
if err != nil {
return err
}
sdk.sdkConfig.BaseUrl = _baseUrl.String()
_sk, err := g.Config().Get(ctx, "angergs.bisdk.reportSk")
if err != nil {
return err
}
sdk.sdkConfig.ReportSk = _sk.String()
_flushInterval, err := g.Config().Get(ctx, "angergs.bisdk.flushInterval")
if err != nil {
return err
}
sdk.sdkConfig.FlushInterval = _flushInterval.Int()
_diskBakPath, err := g.Config().Get(ctx, "angergs.bisdk.diskBakPath")
if err != nil {
return err
}
sdk.sdkConfig.DiskBakPath = _diskBakPath.String()
_retryN, err := g.Config().Get(ctx, "angergs.bisdk.retryN")
if err != nil {
return err
}
sdk.sdkConfig.RetryN = _retryN.Int()
_chanSize, err := g.Config().Get(ctx, "angergs.bisdk.chanSize")
if err != nil {
return err
}
sdk.sdkConfig.ChanSize = _chanSize.Int()
g.Log().Infof(ctx, "[GameLog]client init success, config: %v", sdk.sdkConfig)
return nil
}
func (sdk *SDK) checkConfig() error {
config := sdk.sdkConfig
if config.Pid == "" {
return fmt.Errorf("pid is empty")
}
if config.BaseUrl == "" {
return fmt.Errorf("baseUrl is empty")
}
if config.ReportSk == "" {
return fmt.Errorf("reportSk is empty")
}
if config.FlushInterval <= 0 {
return fmt.Errorf("flushInterval is invalid")
}
if config.DiskBakPath == "" {
return fmt.Errorf("diskBakPath is empty")
}
if config.RetryN == 0 {
config.RetryN = 10
}
if config.ChanSize == 0 {
config.ChanSize = 1000
}
config.DiskBakPath = strings.TrimSuffix(config.DiskBakPath, "/")
return nil
}
func INIT(config *SDKConfig) (*SDK, error) {
// 加载并检查配置
sdk := &SDK{}
if config != nil {
sdk.sdkConfig = config
} else if err := sdk.varinit(); err != nil { // 可以读goframe的配置
return nil, err
}
if err := sdk.checkConfig(); err != nil {
return nil, err
}
gamelogClient = g.Client()
// 初始化队列
sdk.shutdown = make(chan struct{})
sdk.bufferChan = make(chan GameLog, 1000)
sdk.buffer = make([]GameLog, 0, 100)
// 加载失败日志
failLogs, err := sdk.loadFailLogs4disk()
if err != nil {
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
} else if len(failLogs) > 0 {
sdk.buffer = append(sdk.buffer, failLogs...)
}
// 开启协程进行日志发送
sdk.wg = sync.WaitGroup{}
sdk.wg.Add(1)
go func() {
defer sdk.wg.Done()
ticker := time.NewTicker(time.Duration(sdk.sdkConfig.FlushInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-sdk.shutdown:
// 关闭时, 上传一次并备份失败数据
g.Log().Infof(ctx, "[GameLog]begin shutdown and flush last")
sdk.flush()
return
case log := <-sdk.bufferChan:
sdk.buffer = append(sdk.buffer, log)
case <-ticker.C:
sdk.flush()
}
}
}()
return sdk, nil
}
// 从磁盘加载失败日志
func (sdk *SDK) loadFailLogs4disk() (logs []GameLog, err error) {
if !gfile.Exists(sdk.sdkConfig.DiskBakPath) {
return
}
// 遍历diskBakPath下所有failBufferxxx.bak.log文件 读取到log中
files, err := gfile.ScanDir(sdk.sdkConfig.DiskBakPath, "failBuffer*.bak.log")
logs = []GameLog{}
if err != nil {
return
}
// 读取每个备份文件
for _, fp := range files {
// 每一行都是一次失败的记录
gfile.ReadLines(fp, func(line string) error {
_logs := []GameLog{}
err := json.Unmarshal([]byte(line), &_logs)
if err != nil {
return err
}
// 合并到总日志列表
logs = append(logs, _logs...)
return nil
})
g.Log().Infof(ctx, "[GameLog]load %d faillogs from %s", len(logs), fp)
gfile.Remove(fp)
}
return
}
// 备份失败日志追加到磁盘
func (sdk *SDK) bakFailLogs2disk(failLogs []GameLog) {
bakPath := fmt.Sprintf("%s/failBuffer%s.bak.log", sdk.sdkConfig.DiskBakPath, gtime.Now().Format("YmdH"))
content, err := json.Marshal(failLogs)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]marshal fail logs error: %v", err)
return
}
gfile.PutContentsAppend(bakPath, string(content)+"\n")
g.Log().Infof(ctx, "[GameLog]backup fail buffer to %s", bakPath)
}
// 优雅关闭
func (sdk *SDK) Shutdown() {
close(sdk.shutdown)
sdk.wg.Wait()
}
// 日志时间格式
const datetimeFmt = time.DateOnly + " " + time.TimeOnly
// 记录日志
func (sdk *SDK) Log(uid, event string, property map[string]any, timezone string) {
loc := time.Local
if _loc := getLocationMapValue(timezone); _loc != nil {
loc = _loc
}
log := GameLog{
Uid: uid,
Event: event,
Property: property,
EventTimems: gtime.Now().TimestampMilli(),
EventTimeLoc: gtime.Now().In(loc).Format(datetimeFmt),
}
// 线程安全
sdk.bufferChan <- log
}
// 按服务器时区记录日志
func (sdk *SDK) LogLtz(uid, event string, property map[string]any) {
sdk.Log(uid, event, property, time.Local.String())
}
// 这个方法只会在内部协程调用
func (sdk *SDK) flush() {
sdk.mu.Lock()
defer sdk.mu.Unlock()
if len(sdk.buffer) == 0 {
return
}
batch := make([]GameLog, len(sdk.buffer))
copy(batch, sdk.buffer)
sdk.buffer = sdk.buffer[:0]
// 第N次的时候加载失败数据进行尝试
if sdk.sdkConfig.reportN != 0 && sdk.sdkConfig.reportN%sdk.sdkConfig.RetryN == 0 {
faillogs, err := sdk.loadFailLogs4disk()
if err != nil {
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
}
// 如果有失败日志则加入到批量数组中
if len(faillogs) > 0 {
batch = append(batch, faillogs...)
}
}
sdk.send(batch)
}
// 发送消息
func (sdk *SDK) send(logs []GameLog) {
waitSecond := time.Duration(sdk.sdkConfig.FlushInterval/4) * time.Second
timeoutCtx, cancel := context.WithTimeout(context.Background(), waitSecond)
defer cancel()
data := make([][]any, 0, len(logs))
// logs 拆分成二维数组
for _, log := range logs {
propertyJson, err := json.Marshal(log.Property)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]skip log parse, marshal property error: %v", err)
continue
}
data = append(data, []any{
log.Uid,
log.Event,
string(propertyJson),
log.EventTimems,
log.EventTimeLoc,
})
}
// json化
sbody := sendBody{
Pid: sdk.sdkConfig.Pid,
Data: data,
}
jsonBody, err := json.Marshal(sbody)
if err != nil {
g.Log().Errorf(ctx, "[GameLog]marshal send body error: %v", err)
return
}
// giz压缩
gzBody := bytes.NewBuffer([]byte{})
gz := gzip.NewWriter(gzBody)
gz.Write(jsonBody)
gz.Close()
// XOR 加密
xorBody := bytesXOR(gzBody.Bytes(), []byte(sdk.sdkConfig.ReportSk))
sdk.sdkConfig.reportN += 1
res, err := gamelogClient.Post(timeoutCtx, sdk.sdkConfig.BaseUrl+"/report/event", xorBody)
// 失败重新加入缓冲区
if err != nil {
sdk.bakFailLogs2disk(logs)
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), err)
return
}
defer func() {
cerr := res.Close()
if cerr != nil {
g.Log().Errorf(ctx, "[GameLog]close response error: %v", cerr)
}
}()
httpcode := res.StatusCode
resBody := res.ReadAllString()
// 收集器拦截, 重新加入缓冲区
if httpcode != http.StatusOK {
sdk.bakFailLogs2disk(logs)
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), resBody)
}
}
// 混淆
func bytesXOR(data []byte, key []byte) []byte {
obfuscated := make([]byte, len(data))
keyLen := len(key)
if keyLen == 0 {
return data
}
for i := range data {
obfuscated[i] = data[i] ^ key[i%keyLen]
}
return obfuscated
// // 使用示例
// key := []byte{0x12, 0x34, 0x56, 0x78}
// obfuscated := multiXorObfuscate(original, key)
// deobfuscated := multiXorObfuscate(obfuscated, key) // 解密
}

View File

@@ -0,0 +1,52 @@
package test
import (
"strings"
"testing"
"time"
"github.com/ayflying/utility_go/package/gamelog"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/util/grand"
"github.com/google/uuid"
)
func TestGamelog(t *testing.T) {
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
// 必填
Pid: "test5", // 项目ID
BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
ReportSk: "sngame2025", // xor混淆key
FlushInterval: 5, // 上报间隔
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
// 可填
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
ChanSize: 500, // 默认1000, 信道size
})
// 随机测试事件和属性
events := []string{"e1", "e2", "e3", "e4"}
pms := []map[string]any{
{"a": "1"},
{"a": "2"},
{"a": "3"},
{"a": "4"},
}
if err != nil {
t.Fatal(err)
}
gtest.C(t, func(t *gtest.T) {
go func() {
for {
uuidval, _ := uuid.NewUUID()
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
glsdk.LogLtz(randUid, events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
time.Sleep(time.Millisecond * 100)
}
}()
time.Sleep(time.Second * 14)
// 模拟等待信号后优雅关闭
glsdk.Shutdown()
})
}

View File

@@ -1,6 +1,8 @@
package aycache
import (
"math"
v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/internal/boot"
"github.com/ayflying/utility_go/pkg/aycache/drive"
@@ -9,7 +11,6 @@ import (
"github.com/gogf/gf/v2/os/gcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"math"
)
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
@@ -19,6 +20,7 @@ type Mod struct {
// QPSCount 记录缓存的 QPS 计数
var QPSCount int
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
var QPS = promauto.NewGauge(
prometheus.GaugeOpts{
@@ -53,8 +55,13 @@ func New(_name ...string) gcache.Adapter {
// 创建内存缓存适配器
cacheAdapterObj = drive2.NewAdapterMemory()
case "redis":
//第二个参数为配置名称默认为default
var typ = "default"
if len(_name) >= 2 {
typ = _name[1]
}
// 创建 Redis 缓存适配器
cacheAdapterObj = drive2.NewAdapterRedis()
cacheAdapterObj = drive2.NewAdapterRedis(typ)
case "file":
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")

View File

@@ -7,19 +7,22 @@ import (
"github.com/gogf/gf/v2/os/gctx"
)
var adapterRedisClient gcache.Adapter
var adapterRedisCache = gcache.New()
var adapterRedisClient = make(map[string]gcache.Adapter)
var adapterRedisCache = make(map[string]*gcache.Cache)
func NewAdapterRedis() gcache.Adapter {
if adapterRedisClient == nil {
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
func NewAdapterRedis(name string) gcache.Adapter {
if adapterRedisClient[name] == nil {
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
if err != nil {
panic("当前redis配置不存在")
}
var cfg *gredis.Config
_cfg.Scan(&cfg)
redisObj, _ := gredis.New(cfg)
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
adapterRedisCache.SetAdapter(adapterRedisClient)
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
adapterRedisCache[name] = gcache.New()
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
}
return adapterRedisCache
return adapterRedisCache[name]
}

View File

@@ -3,7 +3,9 @@ package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
save = append(save, v)
}
//save = data
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
if err2 != nil {
err = err2
return
}
//需要接收返回信息,判断是否全部执行成功
if response.Errors { //未全部完成
//是否需要删除已成功导入的部分数据
for _, item := range response.Items {
for _, v := range item {
if v.Error != nil { //失败
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
} else {
//删除已导入成功的数据
_, err = s.Delete(ctx, *v.Id_)
if err != nil {
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
}
}
}
}
return errors.New("部分数据导入失败")
}
return
}
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
// Select 查询
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
res, err = s.client.Search(). //Index("my_index").
Request(&search.Request{
Request(&search.Request{
Query: &types.Query{
MatchAll: &types.MatchAllQuery{},
},