Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 14cf759ce1 | |||
| 19a19c1ff1 | |||
| 183d6d8b10 | |||
| d0cad61028 | |||
| efb34e0c5b | |||
|
|
8190e9f6b7 | ||
|
|
27435b57b7 | ||
|
|
0628882533 |
@@ -226,7 +226,6 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
_, err = g.Redis().Del(ctx, v)
|
_, err = g.Redis().Del(ctx, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Error(ctx, err)
|
g.Log().Error(ctx, err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delKey = make([]string, 0)
|
delKey = make([]string, 0)
|
||||||
|
|||||||
@@ -2,16 +2,17 @@ package gameKv
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/pkg"
|
"github.com/ayflying/utility_go/pkg"
|
||||||
"github.com/ayflying/utility_go/service"
|
"github.com/ayflying/utility_go/service"
|
||||||
"github.com/ayflying/utility_go/tools"
|
"github.com/ayflying/utility_go/tools"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -107,8 +108,7 @@ func (s *sGameKv) SavesV1() (err error) {
|
|||||||
for _, v := range delKey {
|
for _, v := range delKey {
|
||||||
_, err2 = g.Redis().Del(ctx, v)
|
_, err2 = g.Redis().Del(ctx, v)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Errorf(ctx, "删除存档错误:%v,err=%v", v, err2)
|
g.Log().Errorf(ctx, "删除存档失败:%v,err=%v", v, err2)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
399
package/gamelog/sdk.go
Normal file
399
package/gamelog/sdk.go
Normal file
@@ -0,0 +1,399 @@
|
|||||||
|
package gamelog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
|
"github.com/gogf/gf/v2/net/gclient"
|
||||||
|
"github.com/gogf/gf/v2/os/gfile"
|
||||||
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sendBody struct {
|
||||||
|
Pid string `json:"pid"`
|
||||||
|
Data [][]any `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo 游戏日志对象
|
||||||
|
type GameLog struct {
|
||||||
|
Uid string // 唯一uid
|
||||||
|
Event string // 事件名
|
||||||
|
Property map[string]any // 事件属性
|
||||||
|
EventTimems int64 // 时间戳毫秒级别
|
||||||
|
EventTimeLoc string // 带时区的本地时间字符串
|
||||||
|
}
|
||||||
|
|
||||||
|
type SDKConfig struct {
|
||||||
|
// 配置变量
|
||||||
|
Pid string // 项目id
|
||||||
|
BaseUrl string // 日志服务器地址
|
||||||
|
ReportSk string // 上报解密key
|
||||||
|
FlushInterval int // 刷新间隔
|
||||||
|
DiskBakPath string // 磁盘备份路径
|
||||||
|
RetryN int // 每N次重试
|
||||||
|
ChanSize int // 信道大小, 默认1000
|
||||||
|
|
||||||
|
reportN int
|
||||||
|
}
|
||||||
|
|
||||||
|
type SDK struct {
|
||||||
|
// 控制变量
|
||||||
|
wg sync.WaitGroup
|
||||||
|
shutdown chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
sdkConfig *SDKConfig
|
||||||
|
bufferChan chan GameLog // 日志队列
|
||||||
|
buffer []GameLog // 日志队列
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx = context.Background()
|
||||||
|
gamelogClient *gclient.Client
|
||||||
|
|
||||||
|
// location map
|
||||||
|
// locationMap map[string]*time.Location = map[string]*time.Location{}
|
||||||
|
locationMap sync.Map // 声明一个线程安全的Map
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
func getLocationMapValue(key string) *time.Location {
|
||||||
|
// 1. 先尝试读
|
||||||
|
value, loaded := locationMap.Load(key)
|
||||||
|
if loaded {
|
||||||
|
return value.(*time.Location) // 如果已经存在,直接返回
|
||||||
|
}
|
||||||
|
// 2. 不存在,就初始化一个该key对应的**固定的**新值
|
||||||
|
location, err := time.LoadLocation(key)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]load location error, try use local timezone: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// 3. 核心:原子性地存储,如果key已存在则返回已存在的值
|
||||||
|
actualValue, loaded := locationMap.LoadOrStore(key, location)
|
||||||
|
if loaded {
|
||||||
|
// 如果loaded为true,说明其他goroutine抢先存了
|
||||||
|
// 我们可以丢弃刚创建的newValue(如果有需要的话),返回已存在的actualValue
|
||||||
|
return actualValue.(*time.Location)
|
||||||
|
}
|
||||||
|
// 如果loaded为false,说明是我们存成功的,返回我们刚创建的newValue
|
||||||
|
return actualValue.(*time.Location)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdk *SDK) varinit() error {
|
||||||
|
sdk.sdkConfig = &SDKConfig{}
|
||||||
|
|
||||||
|
_pid, err := g.Config().Get(ctx, "angergs.bisdk.pid")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.Pid = _pid.String()
|
||||||
|
|
||||||
|
_baseUrl, err := g.Config().Get(ctx, "angergs.bisdk.recodeServerBaseUrl")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.BaseUrl = _baseUrl.String()
|
||||||
|
|
||||||
|
_sk, err := g.Config().Get(ctx, "angergs.bisdk.reportSk")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.ReportSk = _sk.String()
|
||||||
|
|
||||||
|
_flushInterval, err := g.Config().Get(ctx, "angergs.bisdk.flushInterval")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.FlushInterval = _flushInterval.Int()
|
||||||
|
|
||||||
|
_diskBakPath, err := g.Config().Get(ctx, "angergs.bisdk.diskBakPath")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.DiskBakPath = _diskBakPath.String()
|
||||||
|
|
||||||
|
_retryN, err := g.Config().Get(ctx, "angergs.bisdk.retryN")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.RetryN = _retryN.Int()
|
||||||
|
|
||||||
|
_chanSize, err := g.Config().Get(ctx, "angergs.bisdk.chanSize")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sdk.sdkConfig.ChanSize = _chanSize.Int()
|
||||||
|
|
||||||
|
g.Log().Infof(ctx, "[GameLog]client init success, config: %v", sdk.sdkConfig)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdk *SDK) checkConfig() error {
|
||||||
|
config := sdk.sdkConfig
|
||||||
|
if config.Pid == "" {
|
||||||
|
return fmt.Errorf("pid is empty")
|
||||||
|
}
|
||||||
|
if config.BaseUrl == "" {
|
||||||
|
return fmt.Errorf("baseUrl is empty")
|
||||||
|
}
|
||||||
|
if config.ReportSk == "" {
|
||||||
|
return fmt.Errorf("reportSk is empty")
|
||||||
|
}
|
||||||
|
if config.FlushInterval <= 0 {
|
||||||
|
return fmt.Errorf("flushInterval is invalid")
|
||||||
|
}
|
||||||
|
if config.DiskBakPath == "" {
|
||||||
|
return fmt.Errorf("diskBakPath is empty")
|
||||||
|
}
|
||||||
|
if config.RetryN == 0 {
|
||||||
|
config.RetryN = 10
|
||||||
|
}
|
||||||
|
if config.ChanSize == 0 {
|
||||||
|
config.ChanSize = 1000
|
||||||
|
}
|
||||||
|
config.DiskBakPath = strings.TrimSuffix(config.DiskBakPath, "/")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func INIT(config *SDKConfig) (*SDK, error) {
|
||||||
|
// 加载并检查配置
|
||||||
|
sdk := &SDK{}
|
||||||
|
if config != nil {
|
||||||
|
sdk.sdkConfig = config
|
||||||
|
} else if err := sdk.varinit(); err != nil { // 可以读goframe的配置
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := sdk.checkConfig(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
gamelogClient = g.Client()
|
||||||
|
|
||||||
|
// 初始化队列
|
||||||
|
sdk.shutdown = make(chan struct{})
|
||||||
|
sdk.bufferChan = make(chan GameLog, 1000)
|
||||||
|
sdk.buffer = make([]GameLog, 0, 100)
|
||||||
|
// 加载失败日志
|
||||||
|
failLogs, err := sdk.loadFailLogs4disk()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||||
|
} else if len(failLogs) > 0 {
|
||||||
|
sdk.buffer = append(sdk.buffer, failLogs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 开启协程进行日志发送
|
||||||
|
sdk.wg = sync.WaitGroup{}
|
||||||
|
sdk.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer sdk.wg.Done()
|
||||||
|
ticker := time.NewTicker(time.Duration(sdk.sdkConfig.FlushInterval) * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-sdk.shutdown:
|
||||||
|
// 关闭时, 上传一次并备份失败数据
|
||||||
|
g.Log().Infof(ctx, "[GameLog]begin shutdown and flush last")
|
||||||
|
sdk.flush()
|
||||||
|
return
|
||||||
|
case log := <-sdk.bufferChan:
|
||||||
|
sdk.buffer = append(sdk.buffer, log)
|
||||||
|
case <-ticker.C:
|
||||||
|
sdk.flush()
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return sdk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 从磁盘加载失败日志
|
||||||
|
func (sdk *SDK) loadFailLogs4disk() (logs []GameLog, err error) {
|
||||||
|
if !gfile.Exists(sdk.sdkConfig.DiskBakPath) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 遍历diskBakPath下所有failBufferxxx.bak.log文件, 读取到log中
|
||||||
|
files, err := gfile.ScanDir(sdk.sdkConfig.DiskBakPath, "failBuffer*.bak.log")
|
||||||
|
logs = []GameLog{}
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 读取每个备份文件
|
||||||
|
for _, fp := range files {
|
||||||
|
// 每一行都是一次失败的记录
|
||||||
|
gfile.ReadLines(fp, func(line string) error {
|
||||||
|
_logs := []GameLog{}
|
||||||
|
err := json.Unmarshal([]byte(line), &_logs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// 合并到总日志列表
|
||||||
|
logs = append(logs, _logs...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
g.Log().Infof(ctx, "[GameLog]load %d faillogs from %s", len(logs), fp)
|
||||||
|
gfile.Remove(fp)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// 备份失败日志追加到磁盘
|
||||||
|
func (sdk *SDK) bakFailLogs2disk(failLogs []GameLog) {
|
||||||
|
bakPath := fmt.Sprintf("%s/failBuffer%s.bak.log", sdk.sdkConfig.DiskBakPath, gtime.Now().Format("YmdH"))
|
||||||
|
content, err := json.Marshal(failLogs)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]marshal fail logs error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gfile.PutContentsAppend(bakPath, string(content)+"\n")
|
||||||
|
g.Log().Infof(ctx, "[GameLog]backup fail buffer to %s", bakPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 优雅关闭
|
||||||
|
func (sdk *SDK) Shutdown() {
|
||||||
|
close(sdk.shutdown)
|
||||||
|
sdk.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 日志时间格式
|
||||||
|
const datetimeFmt = time.DateOnly + " " + time.TimeOnly
|
||||||
|
|
||||||
|
// 记录日志
|
||||||
|
func (sdk *SDK) Log(uid, event string, property map[string]any, timezone string) {
|
||||||
|
loc := time.Local
|
||||||
|
if _loc := getLocationMapValue(timezone); _loc != nil {
|
||||||
|
loc = _loc
|
||||||
|
}
|
||||||
|
log := GameLog{
|
||||||
|
Uid: uid,
|
||||||
|
Event: event,
|
||||||
|
Property: property,
|
||||||
|
EventTimems: gtime.Now().TimestampMilli(),
|
||||||
|
EventTimeLoc: gtime.Now().In(loc).Format(datetimeFmt),
|
||||||
|
}
|
||||||
|
// 线程安全
|
||||||
|
sdk.bufferChan <- log
|
||||||
|
}
|
||||||
|
|
||||||
|
// 按服务器时区记录日志
|
||||||
|
func (sdk *SDK) LogLtz(uid, event string, property map[string]any) {
|
||||||
|
sdk.Log(uid, event, property, time.Local.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 这个方法只会在内部协程调用
|
||||||
|
func (sdk *SDK) flush() {
|
||||||
|
sdk.mu.Lock()
|
||||||
|
defer sdk.mu.Unlock()
|
||||||
|
if len(sdk.buffer) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := make([]GameLog, len(sdk.buffer))
|
||||||
|
copy(batch, sdk.buffer)
|
||||||
|
sdk.buffer = sdk.buffer[:0]
|
||||||
|
|
||||||
|
// 第N次的时候加载失败数据进行尝试
|
||||||
|
if sdk.sdkConfig.reportN != 0 && sdk.sdkConfig.reportN%sdk.sdkConfig.RetryN == 0 {
|
||||||
|
faillogs, err := sdk.loadFailLogs4disk()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]load fail logs error: %v", err)
|
||||||
|
}
|
||||||
|
// 如果有失败日志则加入到批量数组中
|
||||||
|
if len(faillogs) > 0 {
|
||||||
|
batch = append(batch, faillogs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sdk.send(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送消息
|
||||||
|
func (sdk *SDK) send(logs []GameLog) {
|
||||||
|
waitSecond := time.Duration(sdk.sdkConfig.FlushInterval/4) * time.Second
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(context.Background(), waitSecond)
|
||||||
|
defer cancel()
|
||||||
|
data := make([][]any, 0, len(logs))
|
||||||
|
// logs 拆分成二维数组
|
||||||
|
for _, log := range logs {
|
||||||
|
propertyJson, err := json.Marshal(log.Property)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]skip log parse, marshal property error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
data = append(data, []any{
|
||||||
|
log.Uid,
|
||||||
|
log.Event,
|
||||||
|
string(propertyJson),
|
||||||
|
log.EventTimems,
|
||||||
|
log.EventTimeLoc,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// json化
|
||||||
|
sbody := sendBody{
|
||||||
|
Pid: sdk.sdkConfig.Pid,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
jsonBody, err := json.Marshal(sbody)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]marshal send body error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// giz压缩
|
||||||
|
gzBody := bytes.NewBuffer([]byte{})
|
||||||
|
gz := gzip.NewWriter(gzBody)
|
||||||
|
gz.Write(jsonBody)
|
||||||
|
gz.Close()
|
||||||
|
|
||||||
|
// XOR 加密
|
||||||
|
xorBody := bytesXOR(gzBody.Bytes(), []byte(sdk.sdkConfig.ReportSk))
|
||||||
|
|
||||||
|
sdk.sdkConfig.reportN += 1
|
||||||
|
res, err := gamelogClient.Post(timeoutCtx, sdk.sdkConfig.BaseUrl+"/report/event", xorBody)
|
||||||
|
// 失败重新加入缓冲区
|
||||||
|
if err != nil {
|
||||||
|
sdk.bakFailLogs2disk(logs)
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cerr := res.Close()
|
||||||
|
if cerr != nil {
|
||||||
|
g.Log().Errorf(ctx, "[GameLog]close response error: %v", cerr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
httpcode := res.StatusCode
|
||||||
|
resBody := res.ReadAllString()
|
||||||
|
// 收集器拦截, 重新加入缓冲区
|
||||||
|
if httpcode != http.StatusOK {
|
||||||
|
sdk.bakFailLogs2disk(logs)
|
||||||
|
g.Log().Warningf(ctx, "[GameLog]send log error, bak to fail buffer(%d): %v", len(logs), resBody)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 混淆
|
||||||
|
func bytesXOR(data []byte, key []byte) []byte {
|
||||||
|
obfuscated := make([]byte, len(data))
|
||||||
|
keyLen := len(key)
|
||||||
|
if keyLen == 0 {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range data {
|
||||||
|
obfuscated[i] = data[i] ^ key[i%keyLen]
|
||||||
|
}
|
||||||
|
return obfuscated
|
||||||
|
|
||||||
|
// // 使用示例
|
||||||
|
// key := []byte{0x12, 0x34, 0x56, 0x78}
|
||||||
|
// obfuscated := multiXorObfuscate(original, key)
|
||||||
|
// deobfuscated := multiXorObfuscate(obfuscated, key) // 解密
|
||||||
|
}
|
||||||
52
package/gamelog/test/gamelog_test.go
Normal file
52
package/gamelog/test/gamelog_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ayflying/utility_go/package/gamelog"
|
||||||
|
"github.com/gogf/gf/v2/test/gtest"
|
||||||
|
"github.com/gogf/gf/v2/util/grand"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGamelog(t *testing.T) {
|
||||||
|
glsdk, err := gamelog.INIT(&gamelog.SDKConfig{
|
||||||
|
// 必填
|
||||||
|
Pid: "test5", // 项目ID
|
||||||
|
BaseUrl: "http://47.76.178.47:10101", // 香港测试服上报地址
|
||||||
|
// BaseUrl: "http://127.0.0.1:10101", // 本次测试上报地址
|
||||||
|
ReportSk: "sngame2025", // xor混淆key
|
||||||
|
FlushInterval: 5, // 上报间隔
|
||||||
|
DiskBakPath: "gamelog", // 本地磁盘备份, 用于意外情况下临时保存日志, 请确保该目录持久化(容器内要挂载). 每次启动时或每N次上报时加载到失败队列
|
||||||
|
// 可填
|
||||||
|
RetryN: 2, // 默认每10次, 上传检查一次磁盘的失败数据
|
||||||
|
ChanSize: 500, // 默认1000, 信道size
|
||||||
|
})
|
||||||
|
|
||||||
|
// 随机测试事件和属性
|
||||||
|
events := []string{"e1", "e2", "e3", "e4"}
|
||||||
|
pms := []map[string]any{
|
||||||
|
{"a": "1"},
|
||||||
|
{"a": "2"},
|
||||||
|
{"a": "3"},
|
||||||
|
{"a": "4"},
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
gtest.C(t, func(t *gtest.T) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
uuidval, _ := uuid.NewUUID()
|
||||||
|
randUid := strings.ReplaceAll(uuidval.String(), "-", "")
|
||||||
|
glsdk.LogLtz(randUid, events[grand.Intn(len(events))], pms[grand.Intn(len(pms))])
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Second * 14)
|
||||||
|
// 模拟等待信号后优雅关闭
|
||||||
|
glsdk.Shutdown()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
package aycache
|
package aycache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
|
|
||||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||||
"github.com/ayflying/utility_go/internal/boot"
|
"github.com/ayflying/utility_go/internal/boot"
|
||||||
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
||||||
@@ -9,7 +11,6 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gcache"
|
"github.com/gogf/gf/v2/os/gcache"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"math"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
||||||
@@ -19,6 +20,7 @@ type Mod struct {
|
|||||||
|
|
||||||
// QPSCount 记录缓存的 QPS 计数
|
// QPSCount 记录缓存的 QPS 计数
|
||||||
var QPSCount int
|
var QPSCount int
|
||||||
|
|
||||||
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
||||||
var QPS = promauto.NewGauge(
|
var QPS = promauto.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
@@ -53,8 +55,13 @@ func New(_name ...string) gcache.Adapter {
|
|||||||
// 创建内存缓存适配器
|
// 创建内存缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterMemory()
|
cacheAdapterObj = drive2.NewAdapterMemory()
|
||||||
case "redis":
|
case "redis":
|
||||||
|
//第二个参数为配置名称,默认为default
|
||||||
|
var typ = "default"
|
||||||
|
if len(_name) >= 2 {
|
||||||
|
typ = _name[1]
|
||||||
|
}
|
||||||
// 创建 Redis 缓存适配器
|
// 创建 Redis 缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterRedis()
|
cacheAdapterObj = drive2.NewAdapterRedis(typ)
|
||||||
case "file":
|
case "file":
|
||||||
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
||||||
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
||||||
|
|||||||
@@ -7,19 +7,22 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var adapterRedisClient gcache.Adapter
|
var adapterRedisClient = make(map[string]gcache.Adapter)
|
||||||
var adapterRedisCache = gcache.New()
|
var adapterRedisCache = make(map[string]*gcache.Cache)
|
||||||
|
|
||||||
func NewAdapterRedis() gcache.Adapter {
|
func NewAdapterRedis(name string) gcache.Adapter {
|
||||||
|
if adapterRedisClient[name] == nil {
|
||||||
if adapterRedisClient == nil {
|
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
|
||||||
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
|
if err != nil {
|
||||||
|
panic("当前redis配置不存在")
|
||||||
|
}
|
||||||
var cfg *gredis.Config
|
var cfg *gredis.Config
|
||||||
_cfg.Scan(&cfg)
|
_cfg.Scan(&cfg)
|
||||||
redisObj, _ := gredis.New(cfg)
|
redisObj, _ := gredis.New(cfg)
|
||||||
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
|
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
|
||||||
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
|
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
|
||||||
adapterRedisCache.SetAdapter(adapterRedisClient)
|
adapterRedisCache[name] = gcache.New()
|
||||||
|
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
|
||||||
}
|
}
|
||||||
return adapterRedisCache
|
return adapterRedisCache[name]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package elasticsearch
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||||
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
|||||||
save = append(save, v)
|
save = append(save, v)
|
||||||
}
|
}
|
||||||
//save = data
|
//save = data
|
||||||
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||||
|
if err2 != nil {
|
||||||
|
err = err2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//需要接收返回信息,判断是否全部执行成功
|
||||||
|
if response.Errors { //未全部完成
|
||||||
|
//是否需要删除已成功导入的部分数据
|
||||||
|
for _, item := range response.Items {
|
||||||
|
for _, v := range item {
|
||||||
|
if v.Error != nil { //失败
|
||||||
|
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
//删除已导入成功的数据
|
||||||
|
_, err = s.Delete(ctx, *v.Id_)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("部分数据导入失败")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
|
|||||||
// Select 查询
|
// Select 查询
|
||||||
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
||||||
res, err = s.client.Search(). //Index("my_index").
|
res, err = s.client.Search(). //Index("my_index").
|
||||||
Request(&search.Request{
|
Request(&search.Request{
|
||||||
Query: &types.Query{
|
Query: &types.Query{
|
||||||
MatchAll: &types.MatchAllQuery{},
|
MatchAll: &types.MatchAllQuery{},
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user