Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8190e9f6b7 |
@@ -3,7 +3,9 @@ package elasticsearch
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk"
|
||||
"github.com/elastic/go-elasticsearch/v8/typedapi/core/delete"
|
||||
@@ -82,7 +84,30 @@ func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) {
|
||||
save = append(save, v)
|
||||
}
|
||||
//save = data
|
||||
_, err = s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||
response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx)
|
||||
if err2 != nil {
|
||||
err = err2
|
||||
return
|
||||
}
|
||||
//需要接收返回信息,判断是否全部执行成功
|
||||
if response.Errors { //未全部完成
|
||||
//是否需要删除已成功导入的部分数据
|
||||
for _, item := range response.Items {
|
||||
for _, v := range item {
|
||||
if v.Error != nil { //失败
|
||||
g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason)
|
||||
|
||||
} else {
|
||||
//删除已导入成功的数据
|
||||
_, err = s.Delete(ctx, *v.Id_)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("部分数据导入失败")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -114,7 +139,7 @@ func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response,
|
||||
// Select 查询
|
||||
func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) {
|
||||
res, err = s.client.Search(). //Index("my_index").
|
||||
Request(&search.Request{
|
||||
Request(&search.Request{
|
||||
Query: &types.Query{
|
||||
MatchAll: &types.MatchAllQuery{},
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user