计划任务定时器修改,改为通道顺序执行

This commit is contained in:
ayflying
2025-04-22 11:15:51 +08:00
parent 16da554a60
commit d6bfe1c2fb
3 changed files with 131 additions and 87 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/ayflying/utility_go/pkg/notice" "github.com/ayflying/utility_go/pkg/notice"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/net/gclient"
"github.com/gogf/gf/v2/os/gctx"
) )
type Status struct { type Status struct {
@@ -22,10 +23,10 @@ func (s *sSystemCron) Guardian(DingTalkWebHook string) {
Name string Name string
Address string Address string
} }
cfg, _ := g.Cfg().Get(ctx, "serverList") cfg, _ := g.Cfg().Get(gctx.New(), "serverList")
cfg.Scan(&list) cfg.Scan(&list)
for _, v := range list { for _, v := range list {
get, err := g.Client().Discovery(nil).Get(ctx, v.Address+"/callback/status") get, err := g.Client().Discovery(nil).Get(gctx.New(), v.Address+"/callback/status")
defer get.Close() defer get.Close()
if err != nil { if err != nil {

View File

@@ -14,7 +14,7 @@ import (
) )
var ( var (
ctx = gctx.New() //ctx = gctx.New()
startTime *gtime.Time startTime *gtime.Time
) )
@@ -22,7 +22,8 @@ var (
// 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。 // 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。
type sSystemCron struct { type sSystemCron struct {
//互斥锁 //互斥锁
Lock sync.Mutex Lock sync.Mutex
taskChan chan func() error
// 每秒执行的任务 // 每秒执行的任务
SecondlyTask []func() error SecondlyTask []func() error
@@ -55,7 +56,9 @@ type sSystemCron struct {
} }
func New() *sSystemCron { func New() *sSystemCron {
return &sSystemCron{} return &sSystemCron{
taskChan: make(chan func() error, 1),
}
} }
func init() { func init() {
@@ -72,6 +75,9 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) {
//加锁 //加锁
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
//
//ctx := gctx.New()
//newFunc := func()
switch typ { switch typ {
case v1.CronType_SECOND: case v1.CronType_SECOND:
@@ -118,139 +124,122 @@ func (s *sSystemCron) StartCron() (err error) {
} }
startTime = gtime.Now() startTime = gtime.Now()
g.Log().Debug(ctx, "启动计划任务定时器详情") g.Log().Debug(gctx.New(), "启动计划任务定时器详情")
//每秒任务 //每秒任务
gtimer.SetInterval(ctx, time.Second, func(ctx context.Context) { gtimer.SetInterval(gctx.New(), time.Second, func(ctx context.Context) {
//g.Log().Debug(ctx, "每秒定时器") //g.Log().Debug(ctx, "每秒定时器")
err = s.secondlyTask() s.secondlyTask()
}) })
//每分钟任务 //每分钟任务
_, err = gcron.AddSingleton(ctx, "0 * * * * *", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 * * * * *", func(ctx context.Context) {
//g.Log().Debug(ctx, "每分钟定时器") //g.Log().Debug(ctx, "每分钟定时器")
err = s.minutelyTask() s.minutelyTask()
}) })
//每小时任务 //每小时任务
_, err = gcron.AddSingleton(ctx, "0 0 * * * *", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "每小时定时器") g.Log().Debug(ctx, "每小时定时器")
err = s.hourlyTask() s.hourlyTask()
}) })
//每天任务 //每天任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * *", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "每日定时器") g.Log().Debug(ctx, "每日定时器")
err = s.dailyTask() s.dailyTask()
}) })
//每周任务 //每周任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 1", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 1", func(ctx context.Context) {
g.Log().Debug(ctx, "每周一定时器") g.Log().Debug(ctx, "每周一定时器")
err = s.weeklyTask(1) s.weeklyTask(1)
}) })
//每周二任务 //每周二任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 2", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 2", func(ctx context.Context) {
g.Log().Debug(ctx, "每周二定时器") g.Log().Debug(ctx, "每周二定时器")
err = s.weeklyTask(2) s.weeklyTask(2)
}) })
//周三任务 //周三任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 3", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 3", func(ctx context.Context) {
g.Log().Debug(ctx, "周三定时器") g.Log().Debug(ctx, "周三定时器")
err = s.weeklyTask(3) s.weeklyTask(3)
}) })
//周四任务 //周四任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 4", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 4", func(ctx context.Context) {
g.Log().Debug(ctx, "周四定时器") g.Log().Debug(ctx, "周四定时器")
err = s.weeklyTask(4) s.weeklyTask(4)
}) })
//周五任务 //周五任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 5", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 5", func(ctx context.Context) {
g.Log().Debug(ctx, "周五定时器") g.Log().Debug(ctx, "周五定时器")
err = s.fridayTask() s.weeklyTask(5)
}) })
//周六任务 //周六任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 6", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 6", func(ctx context.Context) {
g.Log().Debug(ctx, "周六定时器") g.Log().Debug(ctx, "周六定时器")
err = s.weeklyTask(6) s.weeklyTask(6)
}) })
//周日任务 //周日任务
_, err = gcron.AddSingleton(ctx, "0 0 0 * * 0", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 0", func(ctx context.Context) {
g.Log().Debug(ctx, "周日定时器") g.Log().Debug(ctx, "周日定时器")
err = s.weeklyTask(7) s.weeklyTask(7)
}) })
//每月任务 //每月任务
_, err = gcron.AddSingleton(ctx, "0 0 0 1 * *", func(ctx context.Context) { _, err = gcron.AddSingleton(gctx.New(), "0 0 0 1 * *", func(ctx context.Context) {
g.Log().Debug(ctx, "每月定时器") g.Log().Debug(ctx, "每月定时器")
err = s.monthlyTask() s.monthlyTask()
}) })
_, err = gcron.AddSingleton(ctx, "0 0 0 1 1 *", func(ctx context.Context) { //每年任务
_, err = gcron.AddSingleton(gctx.New(), "0 0 0 1 1 *", func(ctx context.Context) {
g.Log().Debug(ctx, "每年定时器") g.Log().Debug(ctx, "每年定时器")
err = s.monthlyTask() s.yearlyTask()
}) })
//统一执行方法
s.RunFuncChan()
return return
} }
// 每妙任务 // 每妙任务
func (s *sSystemCron) secondlyTask() (err error) { func (s *sSystemCron) secondlyTask() {
if len(s.SecondlyTask) == 0 { if len(s.SecondlyTask) == 0 {
return return
} }
for _, _func := range s.SecondlyTask { s.AddFuncChan(s.SecondlyTask)
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return return
} }
// 每分钟任务 // 每分钟任务
func (s *sSystemCron) minutelyTask() (err error) { func (s *sSystemCron) minutelyTask() {
if len(s.MinutelyTask) == 0 { if len(s.MinutelyTask) == 0 {
return return
} }
for _, _func := range s.MinutelyTask { s.AddFuncChan(s.MinutelyTask)
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return return
} }
// 每小时任务 // 每小时任务
func (s *sSystemCron) hourlyTask() (err error) { func (s *sSystemCron) hourlyTask() {
if len(s.HourlyTask) == 0 { if len(s.HourlyTask) == 0 {
return return
} }
for _, _func := range s.HourlyTask { s.AddFuncChan(s.HourlyTask)
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return return
} }
// 每天任务 // 每天任务
func (s *sSystemCron) dailyTask() (err error) { func (s *sSystemCron) dailyTask() {
if len(s.DailyTask) == 0 { if len(s.DailyTask) == 0 {
return return
} }
for _, _func := range s.DailyTask { s.AddFuncChan(s.DailyTask)
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return return
} }
// 每周任务 // 每周任务
func (s *sSystemCron) weeklyTask(day int) (err error) { func (s *sSystemCron) weeklyTask(day int) {
var arr []func() error var arr []func() error
switch day { switch day {
case 1: case 1:
@@ -275,39 +264,86 @@ func (s *sSystemCron) weeklyTask(day int) (err error) {
if len(arr) == 0 { if len(arr) == 0 {
return return
} }
for _, _func := range arr { s.AddFuncChan(arr)
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return
}
// 周五任务
func (s *sSystemCron) fridayTask() (err error) {
if len(s.FridayTask) == 0 {
return
}
for _, _func := range s.FridayTask {
err = _func()
if err != nil {
g.Log().Error(ctx, err)
}
}
return return
} }
// 每月任务 // 每月任务
func (s *sSystemCron) monthlyTask() (err error) { func (s *sSystemCron) monthlyTask() {
if len(s.MonthlyTask) == 0 { if len(s.MonthlyTask) == 0 {
return return
} }
for _, _func := range s.MonthlyTask { s.AddFuncChan(s.MonthlyTask)
err = _func() return
if err != nil { }
g.Log().Error(ctx, err)
//每年任务
func (s *sSystemCron) yearlyTask() {
if len(s.YearlyTask) == 0 {
return
}
s.AddFuncChan(s.YearlyTask)
}
// AddFuncChan 添加方法到通道
func (s *sSystemCron) AddFuncChan(list []func() error) {
for _, v := range list {
s.taskChan <- v
}
}
// RunFuncChan 统一执行方法
func (s *sSystemCron) RunFuncChan() {
go func() {
for task := range s.taskChan {
ctx := gctx.New()
func() {
// 使用匿名函数包裹来捕获 panic
defer func() {
if r := recover(); r != nil {
g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r)
}
}()
var taskErr error
done := make(chan error)
go func() {
done <- task()
}()
//err := task()
//if err != nil {
// g.Log().Error(ctx, err)
//}
select {
case taskErr = <-done:
if taskErr != nil {
g.Log().Error(ctx, taskErr)
}
case <-time.After(time.Minute * 10):
g.Log().Errorf(ctx, "task timeout:%v", task)
}
}()
} }
}()
}
// RunFunc 统一执行方法
// deprecated: 弃用会造成周期任务并发执行to service.SystemCron().AddFuncChan
func (s *sSystemCron) RunFunc(list []func() error) {
for _, _func := range list {
ctx := gctx.New()
func() {
// 使用匿名函数包裹来捕获 panic
defer func() {
if r := recover(); r != nil {
g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r)
}
}()
err := _func()
if err != nil {
g.Log().Error(ctx, err)
}
}()
} }
return return
} }

View File

@@ -34,6 +34,13 @@ type (
// @receiver s // @receiver s
// @return err // @return err
StartCron() (err error) StartCron() (err error)
// AddFuncChan 添加方法到通道
AddFuncChan(list []func() error)
// RunFuncChan 统一执行方法
RunFuncChan()
// RunFunc 统一执行方法
// deprecated: 弃用会造成周期任务并发执行to service.SystemCron().AddFuncChan
RunFunc(list []func() error)
} }
) )