批量导入elk后接收返回信息判断是否全部导入成功

This commit is contained in:
liaoyulong
2025-08-21 10:56:51 +08:00
parent 27435b57b7
commit a737e03dfd

View File

@@ -3,7 +3,9 @@ package elasticsearch
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk" "github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete" "github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
save = append(save, v) save = append(save, v)
} }
//save = data //save = data
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx) response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
if err2 != nil {
err = err2
return
}
//需要接收返回信息,判断是否全部执行成功
if response.Errors { //未全部完成
//是否需要删除已成功导入的部分数据
for _, item := range response.Items {
for _, v := range item {
if v.Error != nil { //失败
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
} else {
//删除已导入成功的数据
_, err = s.Delete(ctx, *v.Id_)
if err != nil {
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
}
}
}
}
return errors.New("部分数据导入失败")
}
return return
} }
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
// Select 查询 // Select 查询
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) { func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
res, err = s.client.Search(). //Index("my_index"). res, err = s.client.Search(). //Index("my_index").
Request(&search.Request{ Request(&search.Request{
Query: &types.Query{ Query: &types.Query{
MatchAll: &types.MatchAllQuery{}, MatchAll: &types.MatchAllQuery{},
}, },