From 8190e9f6b7feefa95a5ca0b881d23f2c86ef4c51 Mon Sep 17 00:00:00 2001 From: liaoyulong Date: Thu, 21 Aug 2025 10:56:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=AF=BC=E5=85=A5elk?= =?UTF-8?q?=E5=90=8E=E6=8E=A5=E6=94=B6=E8=BF=94=E5=9B=9E=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E5=88=A4=E6=96=AD=E6=98=AF=E5=90=A6=E5=85=A8=E9=83=A8=E5=AF=BC?= =?UTF-8?q?=E5=85=A5=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/elasticsearch/elasticsearch.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/elasticsearch/elasticsearch.go b/pkg/elasticsearch/elasticsearch.go index 175a250..020eaa6 100644 --- a/pkg/elasticsearch/elasticsearch.go +++ b/pkg/elasticsearch/elasticsearch.go @@ -3,7 +3,9 @@ package elasticsearch import ( "context" "encoding/json" + "errors" "fmt" + "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk" "github.com/elastic/go-elasticsearch/v8/typedapi/core/delete" @@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) { save = append(save, v) } //save = data - _, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx) + response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx) + if err2 != nil { + err = err2 + return + } + //需要接收返回信息,判断是否全部执行成功 + if response.Errors { //未全部完成 + //是否需要删除已成功导入的部分数据 + for _, item := range response.Items { + for _, v := range item { + if v.Error != nil { //失败 + g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason) + + } else { + //删除已导入成功的数据 + _, err = s.Delete(ctx, *v.Id_) + if err != nil { + g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err) + } + } + } + } + return errors.New("部分数据导入失败") + } return } @@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response, // Select 查询 func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) { res, err = s.client.Search(). //Index("my_index"). - Request(&search.Request{ + Request(&search.Request{ Query: &types.Query{ MatchAll: &types.MatchAllQuery{}, },