diff --git a/pkg/elasticsearch/elasticsearch.go b/pkg/elasticsearch/elasticsearch.go index 175a250..020eaa6 100644 --- a/pkg/elasticsearch/elasticsearch.go +++ b/pkg/elasticsearch/elasticsearch.go @@ -3,7 +3,9 @@ package elasticsearch import ( "context" "encoding/json" + "errors" "fmt" + "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk" "github.com/elastic/go-elasticsearch/v8/typedapi/core/delete" @@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) { save = append(save, v) } //save = data - _, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx) + response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx) + if err2 != nil { + err = err2 + return + } + //需要接收返回信息,判断是否全部执行成功 + if response.Errors { //未全部完成 + //是否需要删除已成功导入的部分数据 + for _, item := range response.Items { + for _, v := range item { + if v.Error != nil { //失败 + g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason) + + } else { + //删除已导入成功的数据 + _, err = s.Delete(ctx, *v.Id_) + if err != nil { + g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err) + } + } + } + } + return errors.New("部分数据导入失败") + } return } @@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response, // Select 查询 func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) { res, err = s.client.Search(). //Index("my_index"). - Request(&search.Request{ + Request(&search.Request{ Query: &types.Query{ MatchAll: &types.MatchAllQuery{}, },