diff --git a/drivers/db/elasticsearch/elasticsearch.go b/drivers/db/elasticsearch/elasticsearch.go new file mode 100644 index 0000000..27a8595 --- /dev/null +++ b/drivers/db/elasticsearch/elasticsearch.go @@ -0,0 +1,83 @@ +package elasticsearch + +import ( + "github.com/elastic/go-elasticsearch/v8" +) + +var ( + es *elasticsearch.TypedClient +) + +type elastic struct { + client *elasticsearch.TypedClient +} + +//func (d *Driver) Insert(ctx context.Context, table string, data interface{}, batch ...int) (res sql.Result, err error) { +// +// return +//} +// +//// createIndex 创建索引 +//func (d *Driver) CreateIndex(name string) { +// +// resp, err := d.client.Indices. +// Create(name). +// Do(context.Background()) +// if err != nil { +// fmt.Printf("create index failed, err:%v\n", err) +// return +// } +// fmt.Printf("index:%#v\n", resp.Index) +//} +// +//// indexDocument 索引文档 +//func (d *Driver) IndexDocument(name string, key string, data interface{}) { +// +// // 添加文档 +// resp, err := d.client.Index(name). +// Id(key). +// Document(data). +// Do(context.Background()) +// if err != nil { +// fmt.Printf("indexing document failed, err:%v\n", err) +// return +// } +// fmt.Printf("result:%#v\n", resp.Result) +//} +// +//// getDocument 获取文档 +//func (d *Driver) GetDocument(name string, id string) (res json.RawMessage) { +// resp, err := d.client.Get(name, id). +// Do(context.Background()) +// if err != nil { +// fmt.Printf("get document by id failed, err:%v\n", err) +// return +// } +// fmt.Printf("fileds:%d\n", resp.Source_) +// res = resp.Source_ +// return +//} +// +//// updateDocument 更新文档 +//func (d *Driver) UpdateDocument(name string, key string, data interface{}) { +// +// resp, err := d.client.Update(name, key). +// Doc(data). // 使用结构体变量更新 +// Do(context.Background()) +// if err != nil { +// fmt.Printf("update document failed, err:%v\n", err) +// return +// } +// fmt.Printf("result:%v\n", resp.Result) +//} +// +//// deleteDocument 删除 document +//func (d *Driver) DeleteDocument(name string, key string) { +// resp, err := d.client.Delete(name, key). +// Do(context.Background()) +// if err != nil { +// fmt.Printf("delete document failed, err:%v\n", err) +// return +// } +// fmt.Printf("result:%v\n", resp.Result) +//} diff --git a/drivers/db/elasticsearch/es_open.go b/drivers/db/elasticsearch/es_open.go new file mode 100644 index 0000000..b8c1f1d --- /dev/null +++ b/drivers/db/elasticsearch/es_open.go @@ -0,0 +1,39 @@ +package elasticsearch + +import ( + "database/sql" + "github.com/elastic/go-elasticsearch/v8" + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" +) + +func (d *Driver) Open(config *gdb.ConfigNode) (db *sql.DB, err error) { + var ( + source string + underlyingDriverName = "elasticsearch" + ) + source = config.Host + + cfg := elasticsearch.Config{ + Addresses: []string{ + config.Host, + }, + } + + es, err = elasticsearch.NewTypedClient(cfg) + //if err != nil { + // fmt.Printf("elasticsearch.NewTypedClient failed, err:%v\n", err) + // return + //} + + if db, err = sql.Open(underlyingDriverName, source); err != nil { + err = gerror.WrapCodef( + gcode.CodeDbOperationError, err, + `sql.Open failed for driver "%s" by source "%s"`, underlyingDriverName, source, + ) + return nil, err + } + + return +} diff --git a/drivers/db/elasticsearch/load.go b/drivers/db/elasticsearch/load.go new file mode 100644 index 0000000..dce2ba4 --- /dev/null +++ b/drivers/db/elasticsearch/load.go @@ -0,0 +1,47 @@ +package elasticsearch + +import ( + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" +) + +// Driver is the driver for mysql database. +type Driver struct { + *gdb.Core +} + +const ( + quoteChar = "`" +) + +func init() { + var ( + err error + driverObj = New() + driverNames = g.SliceStr{"es", "elasticsearch"} + ) + for _, driverName := range driverNames { + if err = gdb.Register(driverName, driverObj); err != nil { + panic(err) + } + } +} + +// New create and returns a driver that implements gdb.Driver, which supports operations for MySQL. +func New() gdb.Driver { + return &Driver{} +} + +// New creates and returns a database object for mysql. +// It implements the interface of gdb.Driver for extra database driver installation. +func (d *Driver) New(core *gdb.Core, node *gdb.ConfigNode) (res gdb.DB, err error) { + res = &Driver{ + Core: core, + } + return +} + +// GetChars returns the security char for this type of database. +func (d *Driver) GetChars() (charLeft string, charRight string) { + return quoteChar, quoteChar +} diff --git a/drivers/db/found/found.go b/drivers/db/found/found.go new file mode 100644 index 0000000..565f78a --- /dev/null +++ b/drivers/db/found/found.go @@ -0,0 +1,53 @@ +package found + +import ( + "database/sql" + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" +) + +// Driver is the driver for mysql database. +type Driver struct { + *gdb.Core +} + +func (d *Driver) Open(config *gdb.ConfigNode) (*sql.DB, error) { + //TODO implement me + panic("implement me") +} + +const ( + quoteChar = "`" +) + +func init() { + var ( + err error + driverObj = New() + driverNames = g.SliceStr{"es", "found"} + ) + for _, driverName := range driverNames { + if err = gdb.Register(driverName, driverObj); err != nil { + panic(err) + } + } +} + +// New create and returns a driver that implements gdb.Driver, which supports operations for MySQL. +func New() gdb.Driver { + return &Driver{} +} + +// New creates and returns a database object for mysql. +// It implements the interface of gdb.Driver for extra database driver installation. +func (d *Driver) New(core *gdb.Core, node *gdb.ConfigNode) (res gdb.DB, err error) { + res = &Driver{ + Core: core, + } + return +} + +// GetChars returns the security char for this type of database. +func (d *Driver) GetChars() (charLeft string, charRight string) { + return quoteChar, quoteChar +} diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..84c3587 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,189 @@ +package config + +import ( + "fmt" + "github.com/apolloconfig/agollo/v4" + apolloConfig "github.com/apolloconfig/agollo/v4/env/config" + "github.com/apolloconfig/agollo/v4/storage" + "github.com/gogf/gf/contrib/config/apollo/v2" + "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/encoding/gjson" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gfile" + "github.com/gogf/gf/v2/os/gres" + "github.com/gogf/gf/v2/text/gstr" + "sync" +) + +var ( + //ApolloCfg *apolloConfig.AppConfig + ApolloCfg *apollo.Config + ApolloListener []string + Item2Obj = map[string]Load{} +) + +// load接口定义了Load方法,用于加载数据 +type Load interface { + Load(cfg ...string) +} + +func NewV1() *Cfg { + return &Cfg{} +} + +type Cfg struct { + Lock sync.Mutex +} + +func (c *Cfg) GetDbFile(name string) (res *g.Var, err error) { + get2, err := g.Model("game_config"). + Where("name", name).Master().Value("data") + err = get2.Scan(&res) + if res == nil { + res = &gvar.Var{} + } + return +} + +func (c *Cfg) GetFile(filename string, obj ...Load) (jsonObj *gjson.Json, err error) { + pathStr := "manifest/game/" + filePath := pathStr + filename + ".json" + //err := gres.Load(pathStr + filename) + + //载入静态资源到文件对象 + err = gres.Load(filePath) + var bytes []byte + + if gfile.IsFile(filePath) { + bytes = gfile.GetBytes(filePath) + } else { + bytes = gres.GetContent(filePath) + } + + jsonObj, err = gjson.DecodeToJson(bytes) + //g.Dump(filePath, jsonObj) + return +} + +// getUrlFile 获取远程配置 +func (c *Cfg) GetUrlFile(name string) (jsonObj *gjson.Json, err error) { + urlStr := fmt.Sprintf("http://sdf.sdfs.sdf/%s.json", name) + getUrl, err := g.Client().Discovery(nil).Get(nil, urlStr) + bytes := getUrl.ReadAll() + jsonObj, err = gjson.DecodeToJson(bytes) + return +} + +// 获取阿波罗 +//func (c *Cfg) GetApollo(name string, obj Load) (jsonObj *gjson.Json, err error) { +// jsonObj, err = c.GetApolloV2(name, obj) +// return +// +// //c.Lock.Lock() +// //defer c.Lock.Unlock() +// // +// //Item2Obj[name+".json"] = obj +// //var cfg = apolloConfig.AppConfig{ +// // AppID: ApolloCfg.AppID, +// // Cluster: ApolloCfg.Cluster, +// // IP: ApolloCfg.IP, +// // NamespaceName: name + ".json", +// // Secret: ApolloCfg.Secret, +// // IsBackupConfig: ApolloCfg.IsBackupConfig, +// // BackupConfigPath: ApolloCfg.BackupConfigPath, +// // SyncServerTimeout: 60, +// // MustStart: true, +// //} +// ////cfg.NamespaceName = name + ".json" +// // +// //client, err := agollo.StartWithConfig(func() (*apolloConfig.AppConfig, error) { +// // return ApolloCfg, nil +// //}) +// //if client == nil { +// // return +// //} +// //var getStr string +// //var getApollo *storage.Config +// //for range 5 { +// // getApollo = client.GetConfig(cfg.NamespaceName) +// // if getApollo != nil { +// // break +// // } +// // time.Sleep(time.Second * 5) +// //} +// // +// //if getApollo != nil { +// // getStr = getApollo.GetValue("content") +// // if getStr != "" { +// // //写入配置 +// // gfile.PutContents(path.Join("manifest", "game", name+".json"), getStr) +// // } +// //} else { +// // jsonObj, err = c.GetFile(name) +// //} +// //jsonObj, err = gjson.DecodeToJson(getStr) +// ////首次运行加入监听器 +// //if !gstr.InArray(ApolloListener, name) { +// // c2 := &CustomChangeListener{} +// // client.AddChangeListener(c2) +// // ApolloListener = append(ApolloListener, name) +// //} +// //return +//} + +func (c *Cfg) GetApollo(name string, obj Load) (jsonObj *gjson.Json, err error) { + Item2Obj[name+".json"] = obj + + // 接入阿波罗配置 + ApolloCfg.NamespaceName = name + ".json" + adapter, err := apollo.New(nil, *ApolloCfg) + if err != nil { + g.Log().Fatalf(nil, `%+v`, err) + } + // Change the adapter of default configuration instance. + g.Cfg(name).SetAdapter(adapter) + + //首次运行加入监听器 + if !gstr.InArray(ApolloListener, name+".json") { + //放置监听器 + client, _ := agollo.StartWithConfig(func() (*apolloConfig.AppConfig, error) { + return &apolloConfig.AppConfig{ + AppID: ApolloCfg.AppID, + Cluster: ApolloCfg.Cluster, + NamespaceName: ApolloCfg.NamespaceName, + IP: ApolloCfg.IP, + IsBackupConfig: ApolloCfg.IsBackupConfig, + BackupConfigPath: ApolloCfg.BackupConfigPath, + Secret: ApolloCfg.Secret, + SyncServerTimeout: ApolloCfg.SyncServerTimeout, + MustStart: ApolloCfg.MustStart, + }, nil + }) + c2 := &CustomChangeListener{} + client.AddChangeListener(c2) + ApolloListener = append(ApolloListener, name+".json") + } + + cfg, err := g.Cfg(name).Get(nil, "content") + cfg.Scan(&jsonObj) + return +} + +// 阿波罗监听器 +type CustomChangeListener struct { + wg sync.WaitGroup +} + +func (c *CustomChangeListener) OnChange(changeEvent *storage.ChangeEvent) { + g.Log().Debugf(nil, "当前Namespace变化了:%v", changeEvent.Namespace) + filename := changeEvent.Namespace + if obj, ok := Item2Obj[filename]; ok { + //重载配置文件 + obj.Load(changeEvent.Changes["content"].NewValue.(string)) + } +} + +func (c *CustomChangeListener) OnNewestChange(event *storage.FullChangeEvent) { + //write your code here + +} diff --git a/pkg/elasticsearch/elasticsearch.go b/pkg/elasticsearch/elasticsearch.go index a167198..4377cfe 100644 --- a/pkg/elasticsearch/elasticsearch.go +++ b/pkg/elasticsearch/elasticsearch.go @@ -33,7 +33,6 @@ func New(name ...string) *elastic { return &elastic{ client: es, } - } // createIndex 创建索引 diff --git a/pkg/pkg.go b/pkg/pkg.go index f707885..46fe0a6 100644 --- a/pkg/pkg.go +++ b/pkg/pkg.go @@ -3,6 +3,7 @@ package pkg import ( v1 "github.com/ayflying/utility_go/api/pgk/v1" "github.com/ayflying/utility_go/pkg/aycache" + "github.com/ayflying/utility_go/pkg/config" "github.com/ayflying/utility_go/pkg/notice" "github.com/ayflying/utility_go/pkg/rank" "github.com/ayflying/utility_go/pkg/s3" @@ -33,3 +34,7 @@ func Rank() *rank.Mod { func Websocket() *websocket.SocketV1 { return websocket.NewV1() } + +func Config() *config.Cfg { + return config.NewV1() +}