package elasticsearch import ( "context" "encoding/json" "errors" "fmt" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/bulk" "github.com/elastic/go-elasticsearch/v8/typedapi/core/delete" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" "github.com/elastic/go-elasticsearch/v8/typedapi/core/update" "github.com/elastic/go-elasticsearch/v8/typedapi/types" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/util/gconv" ) var ( es *elasticsearch.TypedClient ) type Elastic struct { client *elasticsearch.TypedClient name string } type elkBulk struct { Index struct { Index string `json:"_index"` Id string `json:"_id"` } `json:"index"` } func NewV1(name string) *Elastic { var cfg elasticsearch.Config _cfg := g.Cfg().MustGetWithEnv(gctx.New(), "elasticsearch") _cfg.Scan(&cfg) if es == nil { var err error es, err = elasticsearch.NewTypedClient(cfg) if err != nil { fmt.Printf("elasticsearch.NewTypedClient failed, err:%v\n", err) return &Elastic{} } } return &Elastic{ client: es, name: name, } } //// Create 创建索引 //func (s *Elastic) Create(ctx context.Context) { // resp, err := s.client.Indices. // Create(s.name).Do(ctx) // if err != nil { // fmt.Printf("create index failed, err:%v\n", err) // return // } // fmt.Printf("index:%#v\n", resp.Index) //} // Set 添加文档索引文档 func (s *Elastic) Set(ctx context.Context, key string, data interface{}) (err error) { // 添加文档 _, err = s.client.Index(s.name).Id(key).Document(data).Do(ctx) return } // SetBulk 批量添加文档 func (s *Elastic) SetBulk(ctx context.Context, data []any) (err error) { var save bulk.Request save = make(bulk.Request, 0) for _, v := range data { val := gconv.Map(v) var saveIndex = elkBulk{} saveIndex.Index.Index = s.name if _, ok := val["uuid"]; ok { saveIndex.Index.Id = val["uuid"].(string) } save = append(save, saveIndex) save = append(save, v) } //save = data response, err2 := s.client.Bulk().Index(s.name).Request(&save).Do(ctx) if err2 != nil { err = err2 return } //需要接收返回信息,判断是否全部执行成功 if response.Errors { //未全部完成 //是否需要删除已成功导入的部分数据 for _, item := range response.Items { for _, v := range item { if v.Error != nil { //失败 g.Log().Errorf(ctx, "导入数据出错 err: %v", *v.Error.Reason) } else { //删除已导入成功的数据 _, err = s.Delete(ctx, *v.Id_) if err != nil { g.Log().Errorf(ctx, "删除数据错误, err:%v\n", err) } } } } return errors.New("部分数据导入失败") } return } // Get 获取文档 func (s *Elastic) Get(ctx context.Context, id string) (res json.RawMessage, err error) { get, err := s.client.Get(s.name, id).Do(ctx) if err != nil { return } res = get.Source_ return } // Update 更新文档 func (s *Elastic) Update(ctx context.Context, key string, data interface{}) (res *update.Response, err error) { res, err = s.client.Update(s.name, key).Doc(data).Do(ctx) return } // Delete 删除 document func (s *Elastic) Delete(ctx context.Context, key string) (res *delete.Response, err error) { res, err = s.client.Delete(s.name, key).Do(ctx) if err != nil { return } return } // Select 查询 func (s *Elastic) Select(ctx context.Context, query *types.MatchAllQuery) (res *search.Response, err error) { res, err = s.client.Search(). //Index("my_index"). Request(&search.Request{ Query: &types.Query{ MatchAll: &types.MatchAllQuery{}, }, }).Do(ctx) return }