From 569937c67f0f45e8893eac27c4a060bf0585fdec Mon Sep 17 00:00:00 2001 From: ayflying Date: Tue, 22 Apr 2025 12:15:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=A1=E5=88=92=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E6=89=93=E6=96=AD=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/boot/boot.go | 6 +- internal/logic/gameAct/gameAct.go | 11 ++-- internal/logic/systemCron/systemCron.go | 77 ++++++++++++++++--------- service/game_act.go | 6 +- service/system_cron.go | 13 ++++- 5 files changed, 75 insertions(+), 38 deletions(-) diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 705beb5..70ede1c 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -1,6 +1,7 @@ package boot import ( + "context" v1 "github.com/ayflying/utility_go/api/system/v1" "github.com/ayflying/utility_go/service" "github.com/gogf/gf/v2/os/gctx" @@ -15,15 +16,14 @@ func Boot() (err error) { err = service.SystemCron().StartCron() //用户活动持久化 - service.SystemCron().AddCron(v1.CronType_DAILY, func() error { - return service.GameAct().Saves() + service.SystemCron().AddCronV2(v1.CronType_DAILY, func(ctx context.Context) error { + return service.GameAct().Saves(ctx) }) //初始化自启动方法 for _, v := range _func { v() } - return nil } diff --git a/internal/logic/gameAct/gameAct.go b/internal/logic/gameAct/gameAct.go index f691311..6272ddb 100644 --- a/internal/logic/gameAct/gameAct.go +++ b/internal/logic/gameAct/gameAct.go @@ -1,6 +1,7 @@ package gameAct import ( + "context" "fmt" "github.com/ayflying/utility_go/internal/model/do" "github.com/ayflying/utility_go/internal/model/entity" @@ -105,16 +106,16 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) { return } -func (s *sGameAct) Saves() (err error) { +func (s *sGameAct) Saves(ctx context.Context) (err error) { //遍历执行 ActList.Iterator(func(i interface{}) bool { - err = s.Save(i.(int)) + err = s.Save(ctx, i.(int)) return true }) return } -func (s *sGameAct) Save(actId int) (err error) { +func (s *sGameAct) Save(ctx context.Context, actId int) (err error) { cacheKey := fmt.Sprintf("act:%v:*", actId) //获取当前用户的key值 @@ -149,8 +150,8 @@ func (s *sGameAct) Save(actId int) (err error) { } //如果有活跃,跳过持久化 - if getBool, _ := pkg.Cache("redis").Contains(ctx, - fmt.Sprintf("act:update:%d", uid)); getBool { + if getBool, _ := pkg.Cache("redis"). + Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool { continue } diff --git a/internal/logic/systemCron/systemCron.go b/internal/logic/systemCron/systemCron.go index 93bfe8c..54ddebb 100644 --- a/internal/logic/systemCron/systemCron.go +++ b/internal/logic/systemCron/systemCron.go @@ -22,42 +22,44 @@ var ( // 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。 type sSystemCron struct { //互斥锁 - Lock sync.Mutex - taskChan chan func() error + Lock sync.Mutex + taskChan chan func(context.Context) error + TaskTimeout time.Duration // 每秒执行的任务 - SecondlyTask []func() error + SecondlyTask []func(context.Context) error // 每分钟执行的任务 - MinutelyTask []func() error + MinutelyTask []func(context.Context) error // 每小时执行的任务 - HourlyTask []func() error + HourlyTask []func(context.Context) error // 每天执行的任务 - DailyTask []func() error + DailyTask []func(context.Context) error // 每周执行的任务 - WeeklyTask []func() error + WeeklyTask []func(context.Context) error // 每月执行的任务 - MonthlyTask []func() error + MonthlyTask []func(context.Context) error // 每年执行的任务 - YearlyTask []func() error + YearlyTask []func(context.Context) error // 每周一执行的任务 - MondayTask []func() error + MondayTask []func(context.Context) error // 每周二执行的任务 - TuesdayTask []func() error + TuesdayTask []func(context.Context) error // 每周三执行的任务 - WednesdayTask []func() error + WednesdayTask []func(context.Context) error // 每周四执行的任务 - ThursdayTask []func() error + ThursdayTask []func(context.Context) error // 每周五执行的任务 - FridayTask []func() error + FridayTask []func(context.Context) error // 每周六执行的任务 - SaturdayTask []func() error + SaturdayTask []func(context.Context) error // 每周日执行的任务 - SundayTask []func() error + SundayTask []func(context.Context) error } func New() *sSystemCron { return &sSystemCron{ - taskChan: make(chan func() error, 1), + taskChan: make(chan func(context.Context) error, 2), + TaskTimeout: time.Minute * 30, } } @@ -68,10 +70,26 @@ func init() { // AddCron 添加一个定时任务到相应的调度列表中。 // // @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 +// 确保自定义任务正确处理上下文取消信号,即可充分发挥超时打断功能。 // @receiver s: sSystemCron的实例,代表一个调度系统。 // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 // @param _func: 要添加的任务函数,该函数执行时应该返回一个error。 +// deprecated: 弃用,请使用 AddCronV2 func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) { + //转换为带上下文的,提供打断 + var _func2 = func(ctx context.Context) error { + return _func() + } + s.AddCronV2(typ, _func2) +} + +// AddCronV2 添加一个定时任务到相应的调度列表中。 +// +// @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 +// @receiver s: sSystemCron的实例,代表一个调度系统。 +// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 +// @param _func: 要添加的任务函数,该函数执行时应该返回一个error。 +func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error) { //加锁 s.Lock.Lock() defer s.Lock.Unlock() @@ -240,7 +258,7 @@ func (s *sSystemCron) dailyTask() { // 每周任务 func (s *sSystemCron) weeklyTask(day int) { - var arr []func() error + var arr []func(context.Context) error switch day { case 1: arr = s.MondayTask @@ -287,7 +305,7 @@ func (s *sSystemCron) yearlyTask() { } // AddFuncChan 添加方法到通道 -func (s *sSystemCron) AddFuncChan(list []func() error) { +func (s *sSystemCron) AddFuncChan(list []func(context.Context) error) { for _, v := range list { s.taskChan <- v } @@ -297,30 +315,35 @@ func (s *sSystemCron) AddFuncChan(list []func() error) { func (s *sSystemCron) RunFuncChan() { go func() { for task := range s.taskChan { - ctx := gctx.New() + //ctx := gctx.New() func() { + //超时释放资源 + ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout) + defer cancel() + // 使用匿名函数包裹来捕获 panic defer func() { if r := recover(); r != nil { - g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r) + g.Log().Errorf(gctx.New(), "执行函数时发生 panic: %v", r) } }() - var taskErr error + done := make(chan error) go func() { - done <- task() + done <- task(ctx) }() //err := task() //if err != nil { // g.Log().Error(ctx, err) //} select { - case taskErr = <-done: + case taskErr := <-done: if taskErr != nil { - g.Log().Error(ctx, taskErr) + // 使用新上下文记录错误 + g.Log().Error(gctx.New(), taskErr) } - case <-time.After(time.Minute * 10): - g.Log().Errorf(ctx, "task timeout:%v", task) + case <-ctx.Done(): // 监听上下文取消(包括超时) + g.Log().Errorf(gctx.New(), "task timeout:%v", ctx.Err()) } }() } diff --git a/service/game_act.go b/service/game_act.go index 91049f1..ec787c6 100644 --- a/service/game_act.go +++ b/service/game_act.go @@ -6,6 +6,8 @@ package service import ( + "context" + "github.com/gogf/gf/v2/frame/g" ) @@ -29,8 +31,8 @@ type ( // @param data interface{}: 要存储的活动信息数据。 // @return err error: 返回错误信息,如果操作成功,则返回nil。 Set(uid int64, actId int, data interface{}) (err error) - Saves() (err error) - Save(actId int) (err error) + Saves(ctx context.Context) (err error) + Save(ctx context.Context, actId int) (err error) // 清空GetRedDot缓存 RefreshGetRedDotCache(uid int64) Del(uid int64, actId int) diff --git a/service/system_cron.go b/service/system_cron.go index b852c8d..ca3a3fd 100644 --- a/service/system_cron.go +++ b/service/system_cron.go @@ -6,6 +6,8 @@ package service import ( + "context" + v1 "github.com/ayflying/utility_go/api/system/v1" "github.com/gogf/gf/v2/net/gclient" ) @@ -24,10 +26,19 @@ type ( // AddCron 添加一个定时任务到相应的调度列表中。 // // @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 + // 确保自定义任务正确处理上下文取消信号,即可充分发挥超时打断功能。 // @receiver s: sSystemCron的实例,代表一个调度系统。 // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 // @param _func: 要添加的任务函数,该函数执行时应该返回一个error。 + // deprecated: 弃用,请使用 AddCronV2 AddCron(typ v1.CronType, _func func() error) + // AddCronV2 添加一个定时任务到相应的调度列表中。 + // + // @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 + // @receiver s: sSystemCron的实例,代表一个调度系统。 + // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 + // @param _func: 要添加的任务函数,该函数执行时应该返回一个error。 + AddCronV2(typ v1.CronType, _func func(context.Context) error) // StartCron 开始计划任务执行 // // @Description: @@ -35,7 +46,7 @@ type ( // @return err StartCron() (err error) // AddFuncChan 添加方法到通道 - AddFuncChan(list []func() error) + AddFuncChan(list []func(context.Context) error) // RunFuncChan 统一执行方法 RunFuncChan() // RunFunc 统一执行方法