Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8190e9f6b7 | ||
|
|
27435b57b7 | ||
|
|
0628882533 | ||
|
|
f68655eee6 |
@@ -3,6 +3,10 @@ package gameAct
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/internal/model/do"
|
||||
"github.com/ayflying/utility_go/internal/model/entity"
|
||||
"github.com/ayflying/utility_go/pkg"
|
||||
@@ -13,9 +17,6 @@ import (
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -206,37 +207,29 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
}
|
||||
////获取多少个数据,删除不是当前修改的数据
|
||||
//count, _ := g.Model(Name).Where(do.GameAct{
|
||||
// Uid: v.Uid,
|
||||
// ActId: v.ActId,
|
||||
//}).Count()
|
||||
//if count > 1 {
|
||||
// g.Model(Name).Where(do.GameAct{
|
||||
// Uid: v.Uid,
|
||||
// ActId: v.ActId,
|
||||
// }).WhereNot("updated_at", v.UpdatedAt).Delete()
|
||||
//}
|
||||
}
|
||||
//dbRes, err2 := g.Model(Name).Batch(50).Data(add).Update()
|
||||
update = make([]*entity.GameAct, 0)
|
||||
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save()
|
||||
add = make([]*entity.GameAct, 0)
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
var count int64
|
||||
|
||||
if len(add) > 0 {
|
||||
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save()
|
||||
add = make([]*entity.GameAct, 0)
|
||||
err = err2
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
}
|
||||
count, _ = dbRes.RowsAffected()
|
||||
}
|
||||
|
||||
for _, v := range delKey {
|
||||
_, err2 = g.Redis().Del(ctx, v)
|
||||
if err2 != nil {
|
||||
g.Log().Error(ctx, err2)
|
||||
return
|
||||
_, err = g.Redis().Del(ctx, v)
|
||||
if err != nil {
|
||||
g.Log().Error(ctx, err)
|
||||
}
|
||||
}
|
||||
delKey = make([]string, 0)
|
||||
|
||||
count, _ := dbRes.RowsAffected()
|
||||
g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,16 +2,17 @@ package gameKv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/pkg"
|
||||
"github.com/ayflying/utility_go/service"
|
||||
"github.com/ayflying/utility_go/tools"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -107,8 +108,7 @@ func (s *sGameKv) SavesV1() (err error) {
|
||||
for _, v := range delKey {
|
||||
_, err2 = g.Redis().Del(ctx, v)
|
||||
if err2 != nil {
|
||||
g.Log().Errorf(ctx, "删除存档错误:%v,err=%v", v, err2)
|
||||
return
|
||||
g.Log().Errorf(ctx, "删除存档失败:%v,err=%v", v, err2)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package aycache
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||
"github.com/ayflying/utility_go/internal/boot"
|
||||
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
||||
@@ -9,7 +11,6 @@ import (
|
||||
"github.com/gogf/gf/v2/os/gcache"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"math"
|
||||
)
|
||||
|
||||
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
||||
@@ -19,6 +20,7 @@ type Mod struct {
|
||||
|
||||
// QPSCount 记录缓存的 QPS 计数
|
||||
var QPSCount int
|
||||
|
||||
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
||||
var QPS = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
@@ -53,8 +55,13 @@ func New(_name ...string) gcache.Adapter {
|
||||
// 创建内存缓存适配器
|
||||
cacheAdapterObj = drive2.NewAdapterMemory()
|
||||
case "redis":
|
||||
//第二个参数为配置名称,默认为default
|
||||
var typ = "default"
|
||||
if len(_name) >= 2 {
|
||||
typ = _name[1]
|
||||
}
|
||||
// 创建 Redis 缓存适配器
|
||||
cacheAdapterObj = drive2.NewAdapterRedis()
|
||||
cacheAdapterObj = drive2.NewAdapterRedis(typ)
|
||||
case "file":
|
||||
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
||||
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
||||
|
||||
@@ -7,19 +7,22 @@ import (
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
)
|
||||
|
||||
var adapterRedisClient gcache.Adapter
|
||||
var adapterRedisCache = gcache.New()
|
||||
var adapterRedisClient = make(map[string]gcache.Adapter)
|
||||
var adapterRedisCache = make(map[string]*gcache.Cache)
|
||||
|
||||
func NewAdapterRedis() gcache.Adapter {
|
||||
|
||||
if adapterRedisClient == nil {
|
||||
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
|
||||
func NewAdapterRedis(name string) gcache.Adapter {
|
||||
if adapterRedisClient[name] == nil {
|
||||
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
|
||||
if err != nil {
|
||||
panic("当前redis配置不存在")
|
||||
}
|
||||
var cfg *gredis.Config
|
||||
_cfg.Scan(&cfg)
|
||||
redisObj, _ := gredis.New(cfg)
|
||||
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
|
||||
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
|
||||
adapterRedisCache.SetAdapter(adapterRedisClient)
|
||||
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
|
||||
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
|
||||
adapterRedisCache[name] = gcache.New()
|
||||
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
|
||||
}
|
||||
return adapterRedisCache
|
||||
return adapterRedisCache[name]
|
||||
}
|
||||
|
||||
@@ -3,7 +3,9 @@ package elasticsearch
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
||||
save = append(save, v)
|
||||
}
|
||||
//save = data
|
||||
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||
if err2 != nil {
|
||||
err = err2
|
||||
return
|
||||
}
|
||||
//需要接收返回信息,判断是否全部执行成功
|
||||
if response.Errors { //未全部完成
|
||||
//是否需要删除已成功导入的部分数据
|
||||
for _, item := range response.Items {
|
||||
for _, v := range item {
|
||||
if v.Error != nil { //失败
|
||||
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
|
||||
|
||||
} else {
|
||||
//删除已导入成功的数据
|
||||
_, err = s.Delete(ctx, *v.Id_)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("部分数据导入失败")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
|
||||
// Select 查询
|
||||
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
||||
res, err = s.client.Search(). //Index("my_index").
|
||||
Request(&search.Request{
|
||||
Request(&search.Request{
|
||||
Query: &types.Query{
|
||||
MatchAll: &types.MatchAllQuery{},
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user