任务通道信息携带周期信息并打印定时任务执行进度 观察堵塞情况
This commit is contained in:
@@ -2,10 +2,12 @@ package systemCron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ayflying/utility_go/api/system/v1"
|
||||
v1 "github.com/ayflying/utility_go/api/system/v1"
|
||||
"github.com/ayflying/utility_go/service"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gcron"
|
||||
@@ -19,12 +21,18 @@ var (
|
||||
startTime *gtime.Time
|
||||
)
|
||||
|
||||
type TaskWithParams struct {
|
||||
// TaskFunc 原有任务函数,保持签名不变
|
||||
TaskFunc func(context.Context) error
|
||||
CronType v1.CronType
|
||||
}
|
||||
|
||||
// sSystemCron 结构体定义了系统定时任务的秒计时器。
|
||||
// 它包含了不同时间周期的任务,如秒、分钟、小时、天、周、月、年以及特定的工作日任务。
|
||||
type sSystemCron struct {
|
||||
//互斥锁
|
||||
Lock sync.Mutex
|
||||
taskChan chan func(context.Context) error
|
||||
taskChan chan TaskWithParams
|
||||
TaskTimeout time.Duration
|
||||
|
||||
// 每秒执行的任务
|
||||
@@ -59,7 +67,7 @@ type sSystemCron struct {
|
||||
|
||||
func New() *sSystemCron {
|
||||
return &sSystemCron{
|
||||
taskChan: make(chan func(context.Context) error, 2),
|
||||
taskChan: make(chan TaskWithParams, 2),
|
||||
TaskTimeout: time.Minute * 60,
|
||||
}
|
||||
}
|
||||
@@ -240,7 +248,7 @@ func (s *sSystemCron) secondlyTask() {
|
||||
if len(s.SecondlyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.SecondlyTask)
|
||||
s.AddFuncChan(s.SecondlyTask, v1.CronType_SECOND)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -249,7 +257,7 @@ func (s *sSystemCron) minutelyTask() {
|
||||
if len(s.MinutelyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.MinutelyTask)
|
||||
s.AddFuncChan(s.MinutelyTask, v1.CronType_MINUTE)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -258,7 +266,7 @@ func (s *sSystemCron) hourlyTask() {
|
||||
if len(s.HourlyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.HourlyTask)
|
||||
s.AddFuncChan(s.HourlyTask, v1.CronType_HOUR)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -267,37 +275,46 @@ func (s *sSystemCron) dailyTask() {
|
||||
if len(s.DailyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.DailyTask)
|
||||
s.AddFuncChan(s.DailyTask, v1.CronType_DAILY)
|
||||
return
|
||||
}
|
||||
|
||||
// 每周任务
|
||||
func (s *sSystemCron) weeklyTask(day int) {
|
||||
var arr []func(context.Context) error
|
||||
var cronType v1.CronType
|
||||
switch day {
|
||||
case 1:
|
||||
arr = s.MondayTask
|
||||
cronType = v1.CronType_MONDAY
|
||||
case 2:
|
||||
arr = s.TuesdayTask
|
||||
cronType = v1.CronType_TUESDAY
|
||||
case 3:
|
||||
arr = s.WednesdayTask
|
||||
cronType = v1.CronType_WEDNESDAY
|
||||
case 4:
|
||||
arr = s.ThursdayTask
|
||||
cronType = v1.CronType_THURSDAY
|
||||
case 5:
|
||||
arr = s.FridayTask
|
||||
cronType = v1.CronType_FRIDAY
|
||||
case 6:
|
||||
arr = s.SaturdayTask
|
||||
cronType = v1.CronType_SATURDAY
|
||||
case 7:
|
||||
arr = s.SundayTask
|
||||
cronType = v1.CronType_SUNDAY
|
||||
default:
|
||||
arr = s.WeeklyTask
|
||||
cronType = v1.CronType_WEEK
|
||||
return
|
||||
}
|
||||
|
||||
if len(arr) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(arr)
|
||||
s.AddFuncChan(arr, cronType)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -306,7 +323,7 @@ func (s *sSystemCron) monthlyTask() {
|
||||
if len(s.MonthlyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.MonthlyTask)
|
||||
s.AddFuncChan(s.MonthlyTask, v1.CronType_MONTH)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -315,14 +332,17 @@ func (s *sSystemCron) yearlyTask() {
|
||||
if len(s.YearlyTask) == 0 {
|
||||
return
|
||||
}
|
||||
s.AddFuncChan(s.YearlyTask)
|
||||
s.AddFuncChan(s.YearlyTask, v1.CronType_YEAR)
|
||||
|
||||
}
|
||||
|
||||
// AddFuncChan 添加方法到通道
|
||||
func (s *sSystemCron) AddFuncChan(list []func(context.Context) error) {
|
||||
func (s *sSystemCron) AddFuncChan(list []func(context.Context) error, cronType v1.CronType) {
|
||||
for _, v := range list {
|
||||
s.taskChan <- v
|
||||
s.taskChan <- TaskWithParams{
|
||||
TaskFunc: v,
|
||||
CronType: cronType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,10 +362,24 @@ func (s *sSystemCron) RunFuncChan() {
|
||||
g.Log().Errorf(gctx.New(), "执行函数时发生 panic: %v", r)
|
||||
}
|
||||
}()
|
||||
if task.CronType != v1.CronType_MINUTE && task.CronType != v1.CronType_SECOND {
|
||||
if runtime.FuncForPC(reflect.ValueOf(task.TaskFunc).Pointer()) != nil {
|
||||
g.Log().Debugf(gctx.New(), "开始执行任务: %v", runtime.FuncForPC(reflect.ValueOf(task.TaskFunc).Pointer()).Name())
|
||||
} else {
|
||||
g.Log().Debugf(gctx.New(), "开始执行任务: %v", "无法获取函数信息")
|
||||
}
|
||||
}
|
||||
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
done <- task(ctx)
|
||||
done <- task.TaskFunc(ctx)
|
||||
if task.CronType != v1.CronType_MINUTE && task.CronType != v1.CronType_SECOND {
|
||||
if runtime.FuncForPC(reflect.ValueOf(task.TaskFunc).Pointer()) != nil {
|
||||
g.Log().Debugf(gctx.New(), "结束执行任务: %v", runtime.FuncForPC(reflect.ValueOf(task.TaskFunc).Pointer()).Name())
|
||||
} else {
|
||||
g.Log().Debugf(gctx.New(), "结束执行任务: %v", "无法获取函数信息")
|
||||
}
|
||||
}
|
||||
}()
|
||||
//err := task()
|
||||
//if err != nil {
|
||||
|
||||
@@ -47,7 +47,7 @@ type (
|
||||
// @return err
|
||||
StartCron() (err error)
|
||||
// AddFuncChan 添加方法到通道
|
||||
AddFuncChan(list []func(context.Context) error)
|
||||
AddFuncChan(list []func(context.Context) error, cronType v1.CronType)
|
||||
// RunFuncChan 统一执行方法
|
||||
RunFuncChan()
|
||||
// RunFunc 统一执行方法
|
||||
|
||||
Reference in New Issue
Block a user