增加计划任务的打断机制

This commit is contained in:
ayflying
2025-04-22 12:15:53 +08:00
parent d6bfe1c2fb
commit 569937c67f
5 changed files with 75 additions and 38 deletions

View File

@@ -1,6 +1,7 @@
package boot package boot
import ( import (
"context"
v1 "github.com/ayflying/utility_go/api/system/v1" v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/ayflying/utility_go/service" "github.com/ayflying/utility_go/service"
"github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gctx"
@@ -15,15 +16,14 @@ func Boot() (err error) {
err = service.SystemCron().StartCron() err = service.SystemCron().StartCron()
//用户活动持久化 //用户活动持久化
service.SystemCron().AddCron(v1.CronType_DAILY, func() error { service.SystemCron().AddCronV2(v1.CronType_DAILY, func(ctx context.Context) error {
return service.GameAct().Saves() return service.GameAct().Saves(ctx)
}) })
//初始化自启动方法 //初始化自启动方法
for _, v := range _func { for _, v := range _func {
v() v()
} }
return nil return nil
} }

View File

@@ -1,6 +1,7 @@
package gameAct package gameAct
import ( import (
"context"
"fmt" "fmt"
"github.com/ayflying/utility_go/internal/model/do" "github.com/ayflying/utility_go/internal/model/do"
"github.com/ayflying/utility_go/internal/model/entity" "github.com/ayflying/utility_go/internal/model/entity"
@@ -105,16 +106,16 @@ func (s *sGameAct) Set(uid int64, actId int, data interface{}) (err error) {
return return
} }
func (s *sGameAct) Saves() (err error) { func (s *sGameAct) Saves(ctx context.Context) (err error) {
//遍历执行 //遍历执行
ActList.Iterator(func(i interface{}) bool { ActList.Iterator(func(i interface{}) bool {
err = s.Save(i.(int)) err = s.Save(ctx, i.(int))
return true return true
}) })
return return
} }
func (s *sGameAct) Save(actId int) (err error) { func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
cacheKey := fmt.Sprintf("act:%v:*", actId) cacheKey := fmt.Sprintf("act:%v:*", actId)
//获取当前用户的key值 //获取当前用户的key值
@@ -149,8 +150,8 @@ func (s *sGameAct) Save(actId int) (err error) {
} }
//如果有活跃,跳过持久化 //如果有活跃,跳过持久化
if getBool, _ := pkg.Cache("redis").Contains(ctx, if getBool, _ := pkg.Cache("redis").
fmt.Sprintf("act:update:%d", uid)); getBool { Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
continue continue
} }

View File

@@ -22,42 +22,44 @@ var (
// 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。 // 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。
type sSystemCron struct { type sSystemCron struct {
//互斥锁 //互斥锁
Lock sync.Mutex Lock sync.Mutex
taskChan chan func() error taskChan chan func(context.Context) error
TaskTimeout time.Duration
// 每秒执行的任务 // 每秒执行的任务
SecondlyTask []func() error SecondlyTask []func(context.Context) error
// 每分钟执行的任务 // 每分钟执行的任务
MinutelyTask []func() error MinutelyTask []func(context.Context) error
// 每小时执行的任务 // 每小时执行的任务
HourlyTask []func() error HourlyTask []func(context.Context) error
// 每天执行的任务 // 每天执行的任务
DailyTask []func() error DailyTask []func(context.Context) error
// 每周执行的任务 // 每周执行的任务
WeeklyTask []func() error WeeklyTask []func(context.Context) error
// 每月执行的任务 // 每月执行的任务
MonthlyTask []func() error MonthlyTask []func(context.Context) error
// 每年执行的任务 // 每年执行的任务
YearlyTask []func() error YearlyTask []func(context.Context) error
// 每周一执行的任务 // 每周一执行的任务
MondayTask []func() error MondayTask []func(context.Context) error
// 每周二执行的任务 // 每周二执行的任务
TuesdayTask []func() error TuesdayTask []func(context.Context) error
// 每周三执行的任务 // 每周三执行的任务
WednesdayTask []func() error WednesdayTask []func(context.Context) error
// 每周四执行的任务 // 每周四执行的任务
ThursdayTask []func() error ThursdayTask []func(context.Context) error
// 每周五执行的任务 // 每周五执行的任务
FridayTask []func() error FridayTask []func(context.Context) error
// 每周六执行的任务 // 每周六执行的任务
SaturdayTask []func() error SaturdayTask []func(context.Context) error
// 每周日执行的任务 // 每周日执行的任务
SundayTask []func() error SundayTask []func(context.Context) error
} }
func New() *sSystemCron { func New() *sSystemCron {
return &sSystemCron{ return &sSystemCron{
taskChan: make(chan func() error, 1), taskChan: make(chan func(context.Context) error, 2),
TaskTimeout: time.Minute * 30,
} }
} }
@@ -68,10 +70,26 @@ func init() {
// AddCron 添加一个定时任务到相应的调度列表中。 // AddCron 添加一个定时任务到相应的调度列表中。
// //
// @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 // @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。
// 确保自定义任务正确处理上下文取消信号,即可充分发挥超时打断功能。
// @receiver s: sSystemCron的实例代表一个调度系统。 // @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。 // @param _func: 要添加的任务函数该函数执行时应该返回一个error。
// deprecated: 弃用,请使用 AddCronV2
func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) { func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
//转换为带上下文的,提供打断
var _func2 = func(ctx context.Context) error {
return _func()
}
s.AddCronV2(typ, _func2)
}
// AddCronV2 添加一个定时任务到相应的调度列表中。
//
// @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。
// @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。
func (s *sSystemCron) AddCronV2(typ v1.CronType, _func func(context.Context) error) {
//加锁 //加锁
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
@@ -240,7 +258,7 @@ func (s *sSystemCron) dailyTask() {
// 每周任务 // 每周任务
func (s *sSystemCron) weeklyTask(day int) { func (s *sSystemCron) weeklyTask(day int) {
var arr []func() error var arr []func(context.Context) error
switch day { switch day {
case 1: case 1:
arr = s.MondayTask arr = s.MondayTask
@@ -287,7 +305,7 @@ func (s *sSystemCron) yearlyTask() {
} }
// AddFuncChan 添加方法到通道 // AddFuncChan 添加方法到通道
func (s *sSystemCron) AddFuncChan(list []func() error) { func (s *sSystemCron) AddFuncChan(list []func(context.Context) error) {
for _, v := range list { for _, v := range list {
s.taskChan <- v s.taskChan <- v
} }
@@ -297,30 +315,35 @@ func (s *sSystemCron) AddFuncChan(list []func() error) {
func (s *sSystemCron) RunFuncChan() { func (s *sSystemCron) RunFuncChan() {
go func() { go func() {
for task := range s.taskChan { for task := range s.taskChan {
ctx := gctx.New() //ctx := gctx.New()
func() { func() {
//超时释放资源
ctx, cancel := context.WithTimeout(context.Background(), s.TaskTimeout)
defer cancel()
// 使用匿名函数包裹来捕获 panic // 使用匿名函数包裹来捕获 panic
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r) g.Log().Errorf(gctx.New(), "执行函数时发生 panic: %v", r)
} }
}() }()
var taskErr error
done := make(chan error) done := make(chan error)
go func() { go func() {
done <- task() done <- task(ctx)
}() }()
//err := task() //err := task()
//if err != nil { //if err != nil {
// g.Log().Error(ctx, err) // g.Log().Error(ctx, err)
//} //}
select { select {
case taskErr = <-done: case taskErr := <-done:
if taskErr != nil { if taskErr != nil {
g.Log().Error(ctx, taskErr) // 使用新上下文记录错误
g.Log().Error(gctx.New(), taskErr)
} }
case <-time.After(time.Minute * 10): case <-ctx.Done(): // 监听上下文取消(包括超时)
g.Log().Errorf(ctx, "task timeout:%v", task) g.Log().Errorf(gctx.New(), "task timeout:%v", ctx.Err())
} }
}() }()
} }

View File

@@ -6,6 +6,8 @@
package service package service
import ( import (
"context"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
) )
@@ -29,8 +31,8 @@ type (
// @param data interface{}: 要存储的活动信息数据。 // @param data interface{}: 要存储的活动信息数据。
// @return err error: 返回错误信息如果操作成功则返回nil。 // @return err error: 返回错误信息如果操作成功则返回nil。
Set(uid int64, actId int, data interface{}) (err error) Set(uid int64, actId int, data interface{}) (err error)
Saves() (err error) Saves(ctx context.Context) (err error)
Save(actId int) (err error) Save(ctx context.Context, actId int) (err error)
// 清空GetRedDot缓存 // 清空GetRedDot缓存
RefreshGetRedDotCache(uid int64) RefreshGetRedDotCache(uid int64)
Del(uid int64, actId int) Del(uid int64, actId int)

View File

@@ -6,6 +6,8 @@
package service package service
import ( import (
"context"
v1 "github.com/ayflying/utility_go/api/system/v1" v1 "github.com/ayflying/utility_go/api/system/v1"
"github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/net/gclient"
) )
@@ -24,10 +26,19 @@ type (
// AddCron 添加一个定时任务到相应的调度列表中。 // AddCron 添加一个定时任务到相应的调度列表中。
// //
// @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。 // @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。
// 确保自定义任务正确处理上下文取消信号,即可充分发挥超时打断功能。
// @receiver s: sSystemCron的实例代表一个调度系统。 // @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。 // @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。 // @param _func: 要添加的任务函数该函数执行时应该返回一个error。
// deprecated: 弃用,请使用 AddCronV2
AddCron(typ v1.CronType, _func func() error) AddCron(typ v1.CronType, _func func() error)
// AddCronV2 添加一个定时任务到相应的调度列表中。
//
// @Description: 根据指定的类型将函数添加到不同的任务列表中,以供后续执行。
// @receiver s: sSystemCron的实例代表一个调度系统。
// @param typ: 任务的类型,决定该任务将被添加到哪个列表中。对应不同的时间间隔。
// @param _func: 要添加的任务函数该函数执行时应该返回一个error。
AddCronV2(typ v1.CronType, _func func(context.Context) error)
// StartCron 开始计划任务执行 // StartCron 开始计划任务执行
// //
// @Description: // @Description:
@@ -35,7 +46,7 @@ type (
// @return err // @return err
StartCron() (err error) StartCron() (err error)
// AddFuncChan 添加方法到通道 // AddFuncChan 添加方法到通道
AddFuncChan(list []func() error) AddFuncChan(list []func(context.Context) error)
// RunFuncChan 统一执行方法 // RunFuncChan 统一执行方法
RunFuncChan() RunFuncChan()
// RunFunc 统一执行方法 // RunFunc 统一执行方法