计划任务修改,act与kv使用协程方式执行,不影响其他任务

This commit is contained in:
ayflying
2025-09-01 18:12:58 +08:00
parent 95539038c0
commit 50cfc23ad2
6 changed files with 31 additions and 12 deletions

View File

@@ -5,6 +5,7 @@ import (
v1 "github.com/ayflying/utility_go/api/system/v1" v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/service" "github.com/ayflying/utility_go/service"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gctx"
) )
@@ -19,9 +20,14 @@ func Boot() (err error) {
//用户活动持久化每小时执行一次 //用户活动持久化每小时执行一次
service.SystemCron().AddCronV2(v1.CronType_HOUR, func(ctx context.Context) error { service.SystemCron().AddCronV2(v1.CronType_HOUR, func(ctx context.Context) error {
err = service.GameKv().SavesV1() go func() {
err = service.GameAct().Saves(ctx) err = service.GameKv().SavesV1(ctx)
return err err = service.GameAct().Saves(ctx)
if err != nil {
g.Log().Error(ctx, err)
}
}()
return nil
}, true) }, true)
//初始化自启动方法 //初始化自启动方法

View File

@@ -212,7 +212,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
//批量写入数据库 //批量写入数据库
updateCount := 0 updateCount := 0
if len(delKey) > 0 { if len(delKey) > 200 {
for _, v := range update { for _, v := range update {
v.UpdatedAt = gtime.Now() v.UpdatedAt = gtime.Now()
updateRes, err2 := g.Model(Name).Where(do.GameAct{ updateRes, err2 := g.Model(Name).Where(do.GameAct{
@@ -235,7 +235,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
var count int64 var count int64
if len(add) > 0 { if len(add) > 0 {
dbRes, err2 := g.Model(Name).Batch(50).Data(add).Save() dbRes, err2 := g.Model(Name).Data(add).Save()
add = make([]*entity.GameAct, 0) add = make([]*entity.GameAct, 0)
err = err2 err = err2
if err != nil { if err != nil {
@@ -243,7 +243,15 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
return return
} }
count, _ = dbRes.RowsAffected() count, _ = dbRes.RowsAffected()
g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count) if count == 0 {
g.Log().Error(ctx, "当前 %v 写入数据库: %v 条", actId, count)
for _, vTemp := range add {
g.Log().Debugf(ctx, "当前act%vadd写入数据: %v,内容:%v", vTemp.ActId, vTemp.Uid, vTemp.Action)
}
return
}
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
} }
for _, v := range delKey { for _, v := range delKey {

View File

@@ -1,6 +1,7 @@
package gameKv package gameKv
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@@ -39,7 +40,7 @@ func init() {
// @Description: 保存用户KV数据列表。 // @Description: 保存用户KV数据列表。
// @receiver s: sGameKv的实例。 // @receiver s: sGameKv的实例。
// @return err: 错误信息如果操作成功则为nil。 // @return err: 错误信息如果操作成功则为nil。
func (s *sGameKv) SavesV1() (err error) { func (s *sGameKv) SavesV1(ctx context.Context) (err error) {
// 最大允许执行时间 // 最大允许执行时间
RunTimeMax = gtime.Now().Add(time.Minute * 30) RunTimeMax = gtime.Now().Add(time.Minute * 30)
g.Log().Debug(ctx, "开始执行游戏kv数据保存") g.Log().Debug(ctx, "开始执行游戏kv数据保存")

View File

@@ -60,7 +60,7 @@ type sSystemCron struct {
func New() *sSystemCron { func New() *sSystemCron {
return &sSystemCron{ return &sSystemCron{
taskChan: make(chan func(context.Context) error, 2), taskChan: make(chan func(context.Context) error, 2),
TaskTimeout: time.Minute * 30, TaskTimeout: time.Minute * 60,
} }
} }
@@ -333,7 +333,7 @@ func (s *sSystemCron) RunFuncChan() {
//ctx := gctx.New() //ctx := gctx.New()
func() { func() {
//超时释放资源 //超时释放资源
ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout) ctx, cancel := context.WithTimeout(gctx.New(), s.TaskTimeout)
defer cancel() defer cancel()
// 使用匿名函数包裹来捕获 panic // 使用匿名函数包裹来捕获 panic

View File

@@ -5,6 +5,10 @@
package service package service
import (
"context"
)
type ( type (
IGameKv interface { IGameKv interface {
// SavesV1 方法 // SavesV1 方法
@@ -12,7 +16,7 @@ type (
// @Description: 保存用户KV数据列表。 // @Description: 保存用户KV数据列表。
// @receiver s: sGameKv的实例。 // @receiver s: sGameKv的实例。
// @return err: 错误信息如果操作成功则为nil。 // @return err: 错误信息如果操作成功则为nil。
SavesV1() (err error) SavesV1(ctx context.Context) (err error)
} }
) )

View File

@@ -38,8 +38,8 @@ type (
// @receiver s: sSystemCron的实例代表一个调度系统。 // @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。 // @param _func: 要添加的任务函数该函数执行时应该返回一个error。
// @param unique: 是否只在唯一服务器上执行 // @param _onlyMain: 是否只在服务器上执行一次,true 唯一执行false 全局执行不判断唯一
AddCronV2(typ v1.CronType, _func func(context.Context) error, unique ...bool) AddCronV2(typ v1.CronType, _func func(context.Context) error, _onlyMain ...bool)
// StartCron 开始计划任务执行 // StartCron 开始计划任务执行
// //
// @Description: // @Description: