From b219123dedc9d410cd760bf1f411f77ad3d003be Mon Sep 17 00:00:00 2001 From: ayflying Date: Mon, 20 Jan 2025 18:47:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=20elasticsearch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/update.go | 12 +++- elasticsearch/elasticsearch.go | 101 +++++++++++++++++++++++++++++++++ go.mod | 8 ++- go.sum | 10 ++++ s3/s3.go | 8 +-- 5 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 elasticsearch/elasticsearch.go diff --git a/cmd/update.go b/cmd/update.go index 57e16e4..bc99307 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -75,13 +75,23 @@ var ( url, err := s3Mod.GetFileUrl(getFileName.String(), bucketName) g.Log().Debugf(ctx, "下载地址:%v", url) + client := g.Client() + client.SetTimeout(time.Minute) + client.SetDiscovery(nil) + //循环服务器,推送更新 for _, v := range list { address := v.Address g.Log().Debugf(ctx, "准备同步服务器:%v,url=%v", v.Name, address+"/callback/update") - get, err := g.Client().Discovery(nil).Timeout(time.Minute*1).Post(ctx, address+"/callback/update", &UpdateReq{ + get, err := client.Post(ctx, address+"/callback/update", &UpdateReq{ FileUrl: url.String(), }) + if err != nil { + get, err = client.Proxy("http://127.0.0.1:10809"). + Post(ctx, address+"/callback/update", &UpdateReq{ + FileUrl: url.String(), + }) + } if err != nil { g.Log().Error(ctx, err) } diff --git a/elasticsearch/elasticsearch.go b/elasticsearch/elasticsearch.go new file mode 100644 index 0000000..a167198 --- /dev/null +++ b/elasticsearch/elasticsearch.go @@ -0,0 +1,101 @@ +package elasticsearch + +import ( + "context" + "encoding/json" + "fmt" + "github.com/elastic/go-elasticsearch/v8" +) + +var ( + es *elasticsearch.TypedClient +) + +type elastic struct { + client *elasticsearch.TypedClient +} + +func New(name ...string) *elastic { + // ES 配置 + cfg := elasticsearch.Config{ + Addresses: []string{ + "http://ay.cname.com:9200", + }, + } + if es == nil { + var err error + es, err = elasticsearch.NewTypedClient(cfg) + if err != nil { + fmt.Printf("elasticsearch.NewTypedClient failed, err:%v\n", err) + return &elastic{} + } + } + return &elastic{ + client: es, + } + +} + +// createIndex 创建索引 +func (s *elastic) CreateIndex(name string) { + resp, err := s.client.Indices. + Create(name). + Do(context.Background()) + if err != nil { + fmt.Printf("create index failed, err:%v\n", err) + return + } + fmt.Printf("index:%#v\n", resp.Index) +} + +// indexDocument 索引文档 +func (s *elastic) IndexDocument(name string, key string, data interface{}) { + + // 添加文档 + resp, err := s.client.Index(name). + Id(key). + Document(data). + Do(context.Background()) + if err != nil { + fmt.Printf("indexing document failed, err:%v\n", err) + return + } + fmt.Printf("result:%#v\n", resp.Result) +} + +// getDocument 获取文档 +func (s *elastic) GetDocument(name string, id string) (res json.RawMessage) { + resp, err := s.client.Get(name, id). + Do(context.Background()) + if err != nil { + fmt.Printf("get document by id failed, err:%v\n", err) + return + } + fmt.Printf("fileds:%s\n", resp.Source_) + res = resp.Source_ + return +} + +// updateDocument 更新文档 +func (s *elastic) UpdateDocument(name string, key string, data interface{}) { + + resp, err := s.client.Update(name, key). + Doc(data). // 使用结构体变量更新 + Do(context.Background()) + if err != nil { + fmt.Printf("update document failed, err:%v\n", err) + return + } + fmt.Printf("result:%v\n", resp.Result) +} + +// deleteDocument 删除 document +func (s *elastic) DeleteDocument(name string, key string) { + resp, err := s.client.Delete(name, key). + Do(context.Background()) + if err != nil { + fmt.Printf("delete document failed, err:%v\n", err) + return + } + fmt.Printf("result:%v\n", resp.Result) +} diff --git a/go.mod b/go.mod index 4b19cf7..6c9ec56 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( github.com/BurntSushi/toml v1.4.0 // indirect github.com/clbanning/mxj/v2 v2.7.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/elastic/go-elasticsearch/v8 v8.17.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/fatih/color v1.18.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -50,10 +52,10 @@ require ( github.com/subosito/gotenv v1.2.0 // indirect github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect golang.org/x/crypto v0.30.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/go.sum b/go.sum index b308c81..a6cab1c 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.17.0 h1:e9cWksE/Fr7urDRmGPGp47Nsp4/mvNOrU8As1l2HQQ0= +github.com/elastic/go-elasticsearch/v8 v8.17.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -326,12 +330,18 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= diff --git a/s3/s3.go b/s3/s3.go index 65c713f..e10176a 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -274,14 +274,14 @@ func (s *Mod) CopyObject(bucketName string, dstStr string, srcStr string) (err e // 原始文件 var dst = minio.CopyDestOptions{ - Bucket: dstStr, - Object: bucketName, + Bucket: bucketName, + Object: dstStr, } // 新文件 var src = minio.CopySrcOptions{ - Bucket: srcStr, - Object: bucketName, + Bucket: bucketName, + Object: srcStr, } _, err = s.client.CopyObject(ctx, dst, src)