Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8190e9f6b7 | ||
|
|
27435b57b7 | ||
|
|
0628882533 |
@@ -226,7 +226,6 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
_, err = g.Redis().Del(ctx, v)
|
_, err = g.Redis().Del(ctx, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Error(ctx, err)
|
g.Log().Error(ctx, err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delKey = make([]string, 0)
|
delKey = make([]string, 0)
|
||||||
|
|||||||
@@ -2,16 +2,17 @@ package gameKv
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ayflying/utility_go/pkg"
|
"github.com/ayflying/utility_go/pkg"
|
||||||
"github.com/ayflying/utility_go/service"
|
"github.com/ayflying/utility_go/service"
|
||||||
"github.com/ayflying/utility_go/tools"
|
"github.com/ayflying/utility_go/tools"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -107,8 +108,7 @@ func (s *sGameKv) SavesV1() (err error) {
|
|||||||
for _, v := range delKey {
|
for _, v := range delKey {
|
||||||
_, err2 = g.Redis().Del(ctx, v)
|
_, err2 = g.Redis().Del(ctx, v)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Errorf(ctx, "删除存档错误:%v,err=%v", v, err2)
|
g.Log().Errorf(ctx, "删除存档失败:%v,err=%v", v, err2)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package aycache
|
package aycache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
|
|
||||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||||
"github.com/ayflying/utility_go/internal/boot"
|
"github.com/ayflying/utility_go/internal/boot"
|
||||||
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
"github.com/ayflying/utility_go/pkg/aycache/drive"
|
||||||
@@ -9,7 +11,6 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gcache"
|
"github.com/gogf/gf/v2/os/gcache"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"math"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
// Mod 定义缓存模块结构体,包含一个 gcache.Cache 客户端实例
|
||||||
@@ -19,6 +20,7 @@ type Mod struct {
|
|||||||
|
|
||||||
// QPSCount 记录缓存的 QPS 计数
|
// QPSCount 记录缓存的 QPS 计数
|
||||||
var QPSCount int
|
var QPSCount int
|
||||||
|
|
||||||
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
// QPS 是一个 Prometheus 指标,用于记录当前缓存的 QPS 数量
|
||||||
var QPS = promauto.NewGauge(
|
var QPS = promauto.NewGauge(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
@@ -53,8 +55,13 @@ func New(_name ...string) gcache.Adapter {
|
|||||||
// 创建内存缓存适配器
|
// 创建内存缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterMemory()
|
cacheAdapterObj = drive2.NewAdapterMemory()
|
||||||
case "redis":
|
case "redis":
|
||||||
|
//第二个参数为配置名称,默认为default
|
||||||
|
var typ = "default"
|
||||||
|
if len(_name) >= 2 {
|
||||||
|
typ = _name[1]
|
||||||
|
}
|
||||||
// 创建 Redis 缓存适配器
|
// 创建 Redis 缓存适配器
|
||||||
cacheAdapterObj = drive2.NewAdapterRedis()
|
cacheAdapterObj = drive2.NewAdapterRedis(typ)
|
||||||
case "file":
|
case "file":
|
||||||
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
// 创建文件缓存适配器,指定缓存目录为 "runtime/cache"
|
||||||
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
||||||
|
|||||||
@@ -7,19 +7,22 @@ import (
|
|||||||
"github.com/gogf/gf/v2/os/gctx"
|
"github.com/gogf/gf/v2/os/gctx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var adapterRedisClient gcache.Adapter
|
var adapterRedisClient = make(map[string]gcache.Adapter)
|
||||||
var adapterRedisCache = gcache.New()
|
var adapterRedisCache = make(map[string]*gcache.Cache)
|
||||||
|
|
||||||
func NewAdapterRedis() gcache.Adapter {
|
func NewAdapterRedis(name string) gcache.Adapter {
|
||||||
|
if adapterRedisClient[name] == nil {
|
||||||
if adapterRedisClient == nil {
|
_cfg, err := g.Cfg().Get(gctx.New(), "redis."+name)
|
||||||
_cfg, _ := g.Cfg().Get(gctx.New(), "redis.default")
|
if err != nil {
|
||||||
|
panic("当前redis配置不存在")
|
||||||
|
}
|
||||||
var cfg *gredis.Config
|
var cfg *gredis.Config
|
||||||
_cfg.Scan(&cfg)
|
_cfg.Scan(&cfg)
|
||||||
redisObj, _ := gredis.New(cfg)
|
redisObj, _ := gredis.New(cfg)
|
||||||
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
|
//adapterRedisClient[name] = gcache.NewAdapterRedis(g.Redis(name))
|
||||||
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
|
adapterRedisClient[name] = gcache.NewAdapterRedis(redisObj)
|
||||||
adapterRedisCache.SetAdapter(adapterRedisClient)
|
adapterRedisCache[name] = gcache.New()
|
||||||
|
adapterRedisCache[name].SetAdapter(adapterRedisClient[name])
|
||||||
}
|
}
|
||||||
return adapterRedisCache
|
return adapterRedisCache[name]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package elasticsearch
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||||
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
|||||||
save = append(save, v)
|
save = append(save, v)
|
||||||
}
|
}
|
||||||
//save = data
|
//save = data
|
||||||
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||||
|
if err2 != nil {
|
||||||
|
err = err2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//需要接收返回信息,判断是否全部执行成功
|
||||||
|
if response.Errors { //未全部完成
|
||||||
|
//是否需要删除已成功导入的部分数据
|
||||||
|
for _, item := range response.Items {
|
||||||
|
for _, v := range item {
|
||||||
|
if v.Error != nil { //失败
|
||||||
|
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
//删除已导入成功的数据
|
||||||
|
_, err = s.Delete(ctx, *v.Id_)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("部分数据导入失败")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
|
|||||||
// Select 查询
|
// Select 查询
|
||||||
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
||||||
res, err = s.client.Search(). //Index("my_index").
|
res, err = s.client.Search(). //Index("my_index").
|
||||||
Request(&search.Request{
|
Request(&search.Request{
|
||||||
Query: &types.Query{
|
Query: &types.Query{
|
||||||
MatchAll: &types.MatchAllQuery{},
|
MatchAll: &types.MatchAllQuery{},
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user