From d6bfe1c2fbb034cb2c8f6de8478a52a4a942800b Mon Sep 17 00:00:00 2001 From: ayflying Date: Tue, 22 Apr 2025 11:15:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=BB=BB=E5=8A=A1=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=99=A8=E4=BF=AE=E6=94=B9=EF=BC=8C=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E9=80=9A=E9=81=93=E9=A1=BA=E5=BA=8F=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/logic/systemCron/listener.go | 5 +- internal/logic/systemCron/systemCron.go | 206 ++++++++++++++---------- service/system_cron.go | 7 + 3 files changed, 131 insertions(+), 87 deletions(-) diff --git a/internal/logic/systemCron/listener.go b/internal/logic/systemCron/listener.go index 87b66c7..7cba9d9 100644 --- a/internal/logic/systemCron/listener.go +++ b/internal/logic/systemCron/listener.go @@ -7,6 +7,7 @@ import ( "github.com/ayflying/utility_go/pkg/notice" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gclient" + "github.com/gogf/gf/v2/os/gctx" ) type Status struct { @@ -22,10 +23,10 @@ func (s *sSystemCron) Guardian(DingTalkWebHook string) { Name string Address string } - cfg, _ := g.Cfg().Get(ctx, "serverList") + cfg, _ := g.Cfg().Get(gctx.New(), "serverList") cfg.Scan(&list) for _, v := range list { - get, err := g.Client().Discovery(nil).Get(ctx, v.Address+"/callback/status") + get, err := g.Client().Discovery(nil).Get(gctx.New(), v.Address+"/callback/status") defer get.Close() if err != nil { diff --git a/internal/logic/systemCron/systemCron.go b/internal/logic/systemCron/systemCron.go index 1d7324d..93bfe8c 100644 --- a/internal/logic/systemCron/systemCron.go +++ b/internal/logic/systemCron/systemCron.go @@ -14,7 +14,7 @@ import ( ) var ( - ctx = gctx.New() + //ctx = gctx.New() startTime *gtime.Time ) @@ -22,7 +22,8 @@ var ( // 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。 type sSystemCron struct { //互斥锁 - Lock sync.Mutex + Lock sync.Mutex + taskChan chan func() error // 每秒执行的任务 SecondlyTask []func() error @@ -55,7 +56,9 @@ type sSystemCron struct { } func New() *sSystemCron { - return &sSystemCron{} + return &sSystemCron{ + taskChan: make(chan func() error, 1), + } } func init() { @@ -72,6 +75,9 @@ func (s *sSystemCron) AddCron(typ v1.CronType, _func func() error) { //加锁 s.Lock.Lock() defer s.Lock.Unlock() + // + //ctx := gctx.New() + //newFunc := func() switch typ { case v1.CronType_SECOND: @@ -118,139 +124,122 @@ func (s *sSystemCron) StartCron() (err error) { } startTime = gtime.Now() - g.Log().Debug(ctx, "启动计划任务定时器详情") + g.Log().Debug(gctx.New(), "启动计划任务定时器详情") //每秒任务 - gtimer.SetInterval(ctx, time.Second, func(ctx context.Context) { + gtimer.SetInterval(gctx.New(), time.Second, func(ctx context.Context) { //g.Log().Debug(ctx, "每秒定时器") - err = s.secondlyTask() + s.secondlyTask() }) //每分钟任务 - _, err = gcron.AddSingleton(ctx, "0 * * * * *", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 * * * * *", func(ctx context.Context) { //g.Log().Debug(ctx, "每分钟定时器") - err = s.minutelyTask() + s.minutelyTask() }) //每小时任务 - _, err = gcron.AddSingleton(ctx, "0 0 * * * *", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 * * * *", func(ctx context.Context) { g.Log().Debug(ctx, "每小时定时器") - err = s.hourlyTask() + s.hourlyTask() }) //每天任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * *", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * *", func(ctx context.Context) { g.Log().Debug(ctx, "每日定时器") - err = s.dailyTask() + s.dailyTask() }) //每周任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 1", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 1", func(ctx context.Context) { g.Log().Debug(ctx, "每周一定时器") - err = s.weeklyTask(1) + s.weeklyTask(1) }) - //每周二任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 2", func(ctx context.Context) { + //每周二的任务 + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 2", func(ctx context.Context) { g.Log().Debug(ctx, "每周二定时器") - err = s.weeklyTask(2) + s.weeklyTask(2) }) //周三任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 3", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 3", func(ctx context.Context) { g.Log().Debug(ctx, "周三定时器") - err = s.weeklyTask(3) + s.weeklyTask(3) }) //周四任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 4", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 4", func(ctx context.Context) { g.Log().Debug(ctx, "周四定时器") - err = s.weeklyTask(4) + s.weeklyTask(4) }) //周五任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 5", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 5", func(ctx context.Context) { g.Log().Debug(ctx, "周五定时器") - err = s.fridayTask() + s.weeklyTask(5) }) //周六任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 6", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 6", func(ctx context.Context) { g.Log().Debug(ctx, "周六定时器") - err = s.weeklyTask(6) + s.weeklyTask(6) }) //周日任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 * * 0", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 * * 0", func(ctx context.Context) { g.Log().Debug(ctx, "周日定时器") - err = s.weeklyTask(7) + s.weeklyTask(7) }) //每月任务 - _, err = gcron.AddSingleton(ctx, "0 0 0 1 * *", func(ctx context.Context) { + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 1 * *", func(ctx context.Context) { g.Log().Debug(ctx, "每月定时器") - err = s.monthlyTask() + s.monthlyTask() }) - _, err = gcron.AddSingleton(ctx, "0 0 0 1 1 *", func(ctx context.Context) { + //每年任务 + _, err = gcron.AddSingleton(gctx.New(), "0 0 0 1 1 *", func(ctx context.Context) { g.Log().Debug(ctx, "每年定时器") - err = s.monthlyTask() + s.yearlyTask() }) + //统一执行方法 + s.RunFuncChan() return } // 每妙任务 -func (s *sSystemCron) secondlyTask() (err error) { +func (s *sSystemCron) secondlyTask() { if len(s.SecondlyTask) == 0 { return } - for _, _func := range s.SecondlyTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } + s.AddFuncChan(s.SecondlyTask) return } // 每分钟任务 -func (s *sSystemCron) minutelyTask() (err error) { +func (s *sSystemCron) minutelyTask() { if len(s.MinutelyTask) == 0 { return } - for _, _func := range s.MinutelyTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } + s.AddFuncChan(s.MinutelyTask) return } // 每小时任务 -func (s *sSystemCron) hourlyTask() (err error) { +func (s *sSystemCron) hourlyTask() { if len(s.HourlyTask) == 0 { return } - for _, _func := range s.HourlyTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } + s.AddFuncChan(s.HourlyTask) return } // 每天任务 -func (s *sSystemCron) dailyTask() (err error) { +func (s *sSystemCron) dailyTask() { if len(s.DailyTask) == 0 { return } - for _, _func := range s.DailyTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } + s.AddFuncChan(s.DailyTask) return } // 每周任务 -func (s *sSystemCron) weeklyTask(day int) (err error) { +func (s *sSystemCron) weeklyTask(day int) { var arr []func() error switch day { case 1: @@ -275,39 +264,86 @@ func (s *sSystemCron) weeklyTask(day int) (err error) { if len(arr) == 0 { return } - for _, _func := range arr { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } - return -} - -// 周五任务 -func (s *sSystemCron) fridayTask() (err error) { - if len(s.FridayTask) == 0 { - return - } - for _, _func := range s.FridayTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) - } - } + s.AddFuncChan(arr) return } // 每月任务 -func (s *sSystemCron) monthlyTask() (err error) { +func (s *sSystemCron) monthlyTask() { if len(s.MonthlyTask) == 0 { return } - for _, _func := range s.MonthlyTask { - err = _func() - if err != nil { - g.Log().Error(ctx, err) + s.AddFuncChan(s.MonthlyTask) + return +} + +//每年任务 +func (s *sSystemCron) yearlyTask() { + if len(s.YearlyTask) == 0 { + return + } + s.AddFuncChan(s.YearlyTask) + +} + +// AddFuncChan 添加方法到通道 +func (s *sSystemCron) AddFuncChan(list []func() error) { + for _, v := range list { + s.taskChan <- v + } +} + +// RunFuncChan 统一执行方法 +func (s *sSystemCron) RunFuncChan() { + go func() { + for task := range s.taskChan { + ctx := gctx.New() + func() { + // 使用匿名函数包裹来捕获 panic + defer func() { + if r := recover(); r != nil { + g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r) + } + }() + var taskErr error + done := make(chan error) + go func() { + done <- task() + }() + //err := task() + //if err != nil { + // g.Log().Error(ctx, err) + //} + select { + case taskErr = <-done: + if taskErr != nil { + g.Log().Error(ctx, taskErr) + } + case <-time.After(time.Minute * 10): + g.Log().Errorf(ctx, "task timeout:%v", task) + } + }() } + }() +} + +// RunFunc 统一执行方法 +// deprecated: 弃用,会造成周期任务并发执行,to service.SystemCron().AddFuncChan +func (s *sSystemCron) RunFunc(list []func() error) { + for _, _func := range list { + ctx := gctx.New() + func() { + // 使用匿名函数包裹来捕获 panic + defer func() { + if r := recover(); r != nil { + g.Log().Errorf(ctx, "执行函数时发生 panic: %v", r) + } + }() + err := _func() + if err != nil { + g.Log().Error(ctx, err) + } + }() } return } diff --git a/service/system_cron.go b/service/system_cron.go index 41ffb58..b852c8d 100644 --- a/service/system_cron.go +++ b/service/system_cron.go @@ -34,6 +34,13 @@ type ( // @receiver s // @return err StartCron() (err error) + // AddFuncChan 添加方法到通道 + AddFuncChan(list []func() error) + // RunFuncChan 统一执行方法 + RunFuncChan() + // RunFunc 统一执行方法 + // deprecated: 弃用,会造成周期任务并发执行,to service.SystemCron().AddFuncChan + RunFunc(list []func() error) } )