更新缓存模块
This commit is contained in:
@@ -29,7 +29,7 @@ func New(_name ...string) gcache.Adapter {
|
||||
case "file":
|
||||
cacheAdapterObj = drive2.NewAdapterFile("runtime/cache")
|
||||
case "es":
|
||||
cacheAdapterObj = drive.NewAdapterElasticsearch([]string{"http://127.0.0.1:9200"})
|
||||
cacheAdapterObj = drive.NewAdapterElasticsearch(_name[1])
|
||||
}
|
||||
|
||||
//var client = gcache.New()
|
||||
|
||||
@@ -2,23 +2,51 @@ package drive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/gogf/gf/v2/container/gvar"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gcache"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
adapterElasticsearchClient gcache.Adapter
|
||||
)
|
||||
|
||||
type AdapterElasticsearch struct {
|
||||
//FilePath string
|
||||
Addresses []string
|
||||
client *elasticsearch.TypedClient
|
||||
name string
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) Set(ctx context.Context, key interface{}, value interface{}, duration time.Duration) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (a AdapterElasticsearch) Set(ctx context.Context, _key interface{}, value interface{}, duration time.Duration) (err error) {
|
||||
key := gconv.String(_key)
|
||||
data := gconv.Map(value)
|
||||
if duration > 0 {
|
||||
data["delete_time"] = time.Now().Add(duration)
|
||||
}
|
||||
_, err = a.client.Index(a.name).Id(key).
|
||||
Document(data).Do(ctx)
|
||||
if err != nil {
|
||||
fmt.Printf("indexing document failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) SetMap(ctx context.Context, data map[interface{}]interface{}, duration time.Duration) error {
|
||||
//TODO implement me
|
||||
for k, v := range data {
|
||||
save := gconv.Map(v)
|
||||
if duration > 0 {
|
||||
save["delete_time"] = time.Now().Add(duration)
|
||||
}
|
||||
key := gconv.String(k)
|
||||
a.client.Index(a.name).Id(key).
|
||||
Document(save).Do(ctx)
|
||||
}
|
||||
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@@ -37,9 +65,17 @@ func (a AdapterElasticsearch) SetIfNotExistFuncLock(ctx context.Context, key int
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) Get(ctx context.Context, key interface{}) (*gvar.Var, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (a AdapterElasticsearch) Get(ctx context.Context, key interface{}) (res *gvar.Var, err error) {
|
||||
_key := gconv.String(key)
|
||||
resp, err := a.client.Get(a.name, _key).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
fmt.Printf("get document by id failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("fileds:%s\n", resp.Source_)
|
||||
res = gvar.New(resp.Source_)
|
||||
return
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) GetOrSet(ctx context.Context, key interface{}, value interface{}, duration time.Duration) (result *gvar.Var, err error) {
|
||||
@@ -82,9 +118,26 @@ func (a AdapterElasticsearch) Values(ctx context.Context) (values []interface{},
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) Update(ctx context.Context, key interface{}, value interface{}) (oldValue *gvar.Var, exist bool, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (a AdapterElasticsearch) Update(ctx context.Context, _key interface{}, value interface{}) (oldValue *gvar.Var, exist bool, err error) {
|
||||
key := gconv.String(_key)
|
||||
data := gconv.Map(value)
|
||||
oldValue, err = a.Get(ctx, key)
|
||||
if err != nil {
|
||||
exist = false
|
||||
} else {
|
||||
for k, v := range oldValue.Map() {
|
||||
if _, ok := data[k]; !ok {
|
||||
data[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = a.client.Update(a.name, key).
|
||||
Doc(data).Do(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) UpdateExpire(ctx context.Context, key interface{}, duration time.Duration) (oldDuration time.Duration, err error) {
|
||||
@@ -98,8 +151,15 @@ func (a AdapterElasticsearch) GetExpire(ctx context.Context, key interface{}) (t
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) Remove(ctx context.Context, keys ...interface{}) (lastValue *gvar.Var, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
//获取keys最后一个
|
||||
lastKey := keys[len(keys)-1]
|
||||
lastValue, _ = a.Get(ctx, lastKey)
|
||||
|
||||
for k := range keys {
|
||||
key := gconv.String(k)
|
||||
a.client.Delete(a.name, key).Do(ctx)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (a AdapterElasticsearch) Clear(ctx context.Context) error {
|
||||
@@ -112,8 +172,17 @@ func (a AdapterElasticsearch) Close(ctx context.Context) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewAdapterElasticsearch(addresses []string) gcache.Adapter {
|
||||
return &AdapterElasticsearch{
|
||||
Addresses: addresses,
|
||||
func NewAdapterElasticsearch(name string) gcache.Adapter {
|
||||
|
||||
if adapterElasticsearchClient == nil {
|
||||
_cfg, _ := g.Cfg().Get(gctx.New(), "elasticsearch")
|
||||
var cfg elasticsearch.Config
|
||||
_cfg.Scan(&cfg)
|
||||
es, _ := elasticsearch.NewTypedClient(cfg)
|
||||
adapterElasticsearchClient = &AdapterElasticsearch{
|
||||
client: es,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
return adapterElasticsearchClient
|
||||
}
|
||||
|
||||
@@ -16,17 +16,6 @@ type AdapterFile struct {
|
||||
}
|
||||
|
||||
func (a AdapterFile) Set(ctx context.Context, key interface{}, value interface{}, duration time.Duration) error {
|
||||
//defer a.handleLruKey(ctx, key)
|
||||
//expireTime := a.getInternalExpire(duration)
|
||||
//a.data.Set(key, memoryDataItem{
|
||||
// a: value,
|
||||
// a: expireTime,
|
||||
//})
|
||||
//c.eventList.PushBack(&adapterMemoryEvent{
|
||||
// k: key,
|
||||
// e: expireTime,
|
||||
//})
|
||||
|
||||
arr := strings.Split(":", gconv.String(key))
|
||||
fileName := path.Join(arr...)
|
||||
return gfile.PutBytes(fileName, gconv.Bytes(value))
|
||||
|
||||
@@ -19,7 +19,6 @@ func NewAdapterRedis() gcache.Adapter {
|
||||
redisObj, _ := gredis.New(cfg)
|
||||
//adapterRedisClient = gcache.NewAdapterRedis(g.Redis("default"))
|
||||
adapterRedisClient = gcache.NewAdapterRedis(redisObj)
|
||||
|
||||
adapterRedisCache.SetAdapter(adapterRedisClient)
|
||||
}
|
||||
return adapterRedisCache
|
||||
|
||||
@@ -5,96 +5,102 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/update"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gctx"
|
||||
)
|
||||
|
||||
var (
|
||||
es *elasticsearch.TypedClient
|
||||
)
|
||||
|
||||
type elastic struct {
|
||||
type Elastic struct {
|
||||
client *elasticsearch.TypedClient
|
||||
name string
|
||||
}
|
||||
|
||||
func New(name ...string) *elastic {
|
||||
// ES 配置
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{
|
||||
"http://ay.cname.com:9200",
|
||||
},
|
||||
}
|
||||
func NewV1(name string) *Elastic {
|
||||
var cfg elasticsearch.Config
|
||||
_cfg := g.Cfg().MustGetWithEnv(gctx.New(), "elasticsearch")
|
||||
_cfg.Scan(&cfg)
|
||||
if es == nil {
|
||||
var err error
|
||||
es, err = elasticsearch.NewTypedClient(cfg)
|
||||
if err != nil {
|
||||
fmt.Printf("elasticsearch.NewTypedClient failed, err:%v\n", err)
|
||||
return &elastic{}
|
||||
return &Elastic{}
|
||||
}
|
||||
}
|
||||
return &elastic{
|
||||
return &Elastic{
|
||||
client: es,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// createIndex 创建索引
|
||||
func (s *elastic) CreateIndex(name string) {
|
||||
resp, err := s.client.Indices.
|
||||
Create(name).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
fmt.Printf("create index failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("index:%#v\n", resp.Index)
|
||||
}
|
||||
|
||||
// indexDocument 索引文档
|
||||
func (s *elastic) IndexDocument(name string, key string, data interface{}) {
|
||||
//// Create 创建索引
|
||||
//func (s *Elastic) Create(ctx context.Context) {
|
||||
// resp, err := s.client.Indices.
|
||||
// Create(s.name).Do(ctx)
|
||||
// if err != nil {
|
||||
// fmt.Printf("create index failed, err:%v\n", err)
|
||||
// return
|
||||
// }
|
||||
// fmt.Printf("index:%#v\n", resp.Index)
|
||||
//}
|
||||
|
||||
// Set 索引文档
|
||||
func (s *Elastic) Set(ctx context.Context, key string, data interface{}) (err error) {
|
||||
// 添加文档
|
||||
resp, err := s.client.Index(name).
|
||||
Id(key).
|
||||
Document(data).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
fmt.Printf("indexing document failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("result:%#v\n", resp.Result)
|
||||
}
|
||||
|
||||
// getDocument 获取文档
|
||||
func (s *elastic) GetDocument(name string, id string) (res json.RawMessage) {
|
||||
resp, err := s.client.Get(name, id).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
fmt.Printf("get document by id failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("fileds:%s\n", resp.Source_)
|
||||
res = resp.Source_
|
||||
_, err = s.client.Index(s.name).Id(key).Document(data).Do(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// updateDocument 更新文档
|
||||
func (s *elastic) UpdateDocument(name string, key string, data interface{}) {
|
||||
|
||||
resp, err := s.client.Update(name, key).
|
||||
Doc(data). // 使用结构体变量更新
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
fmt.Printf("update document failed, err:%v\n", err)
|
||||
return
|
||||
// SetBulk 批量添加文档
|
||||
func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
||||
var save *bulk.Request
|
||||
save = &bulk.Request{
|
||||
data,
|
||||
}
|
||||
fmt.Printf("result:%v\n", resp.Result)
|
||||
s.client.Bulk().Index(s.name).Request(save).Do(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// deleteDocument 删除 document
|
||||
func (s *elastic) DeleteDocument(name string, key string) {
|
||||
resp, err := s.client.Delete(name, key).
|
||||
Do(context.Background())
|
||||
// Get 获取文档
|
||||
func (s *Elastic) Get(ctx context.Context, id string) (res json.RawMessage, err error) {
|
||||
get, err := s.client.Get(s.name, id).Do(ctx)
|
||||
if err != nil {
|
||||
fmt.Printf("delete document failed, err:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("result:%v\n", resp.Result)
|
||||
res = get.Source_
|
||||
return
|
||||
}
|
||||
|
||||
// Update 更新文档
|
||||
func (s *Elastic) Update(ctx context.Context, key string, data interface{}) (res *update.Response, err error) {
|
||||
res, err = s.client.Update(s.name, key).Doc(data).Do(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete 删除 document
|
||||
func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response, err error) {
|
||||
res, err = s.client.Delete(s.name, key).Do(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Select 查询
|
||||
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
||||
res, err = s.client.Search(). //Index("my_index").
|
||||
Request(&search.Request{
|
||||
Query: &types.Query{
|
||||
MatchAll: &types.MatchAllQuery{},
|
||||
},
|
||||
}).Do(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
v1 "github.com/ayflying/utility_go/api/pkg/v1"
|
||||
"github.com/ayflying/utility_go/pkg/aycache"
|
||||
"github.com/ayflying/utility_go/pkg/config"
|
||||
"github.com/ayflying/utility_go/pkg/elasticsearch"
|
||||
"github.com/ayflying/utility_go/pkg/notice"
|
||||
"github.com/ayflying/utility_go/pkg/rank"
|
||||
"github.com/ayflying/utility_go/pkg/s3"
|
||||
@@ -38,3 +39,7 @@ func Websocket() *websocket.SocketV1 {
|
||||
func Config() *config.Cfg {
|
||||
return config.NewV1()
|
||||
}
|
||||
|
||||
func Elastic(name string) *elasticsearch.Elastic {
|
||||
return elasticsearch.NewV1(name)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user