Merge pull request #4441 from changsongl/feature/timeout-support-for-task
finish timeout option for task #4430
This commit is contained in:
commit
422498af8d
@ -11,3 +11,4 @@
|
|||||||
- Support session Filter chain. [4404](https://github.com/beego/beego/pull/4404)
|
- Support session Filter chain. [4404](https://github.com/beego/beego/pull/4404)
|
||||||
- Feature issue #4402 finish router get example. [4416](https://github.com/beego/beego/pull/4416)
|
- Feature issue #4402 finish router get example. [4416](https://github.com/beego/beego/pull/4416)
|
||||||
- Implement context.Context support and deprecate `QueryM2MWithCtx` and `QueryTableWithCtx` [4424](https://github.com/beego/beego/pull/4424)
|
- Implement context.Context support and deprecate `QueryM2MWithCtx` and `QueryTableWithCtx` [4424](https://github.com/beego/beego/pull/4424)
|
||||||
|
- Finish timeout option for tasks #4441 [4441](https://github.com/beego/beego/pull/4441)
|
||||||
@ -289,3 +289,7 @@ func (o *oldToNewAdapter) SetPrev(ctx context.Context, t time.Time) {
|
|||||||
func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time {
|
func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time {
|
||||||
return o.delegate.GetPrev()
|
return o.delegate.GetPrev()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *oldToNewAdapter) GetTimeout(ctx context.Context) time.Duration {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|||||||
@ -55,6 +55,10 @@ func (c *countTask) GetPrev(ctx context.Context) time.Time {
|
|||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *countTask) GetTimeout(ctx context.Context) time.Duration {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func TestRunTaskCommand_Execute(t *testing.T) {
|
func TestRunTaskCommand_Execute(t *testing.T) {
|
||||||
task := &countTask{}
|
task := &countTask{}
|
||||||
AddTask("count", task)
|
AddTask("count", task)
|
||||||
|
|||||||
130
task/task.go
130
task/task.go
@ -109,6 +109,7 @@ type Tasker interface {
|
|||||||
GetNext(ctx context.Context) time.Time
|
GetNext(ctx context.Context) time.Time
|
||||||
SetPrev(context.Context, time.Time)
|
SetPrev(context.Context, time.Time)
|
||||||
GetPrev(ctx context.Context) time.Time
|
GetPrev(ctx context.Context) time.Time
|
||||||
|
GetTimeout(ctx context.Context) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// task error
|
// task error
|
||||||
@ -127,13 +128,14 @@ type Task struct {
|
|||||||
DoFunc TaskFunc
|
DoFunc TaskFunc
|
||||||
Prev time.Time
|
Prev time.Time
|
||||||
Next time.Time
|
Next time.Time
|
||||||
Errlist []*taskerr // like errtime:errinfo
|
Timeout time.Duration // timeout duration
|
||||||
ErrLimit int // max length for the errlist, 0 stand for no limit
|
Errlist []*taskerr // like errtime:errinfo
|
||||||
errCnt int // records the error count during the execution
|
ErrLimit int // max length for the errlist, 0 stand for no limit
|
||||||
|
errCnt int // records the error count during the execution
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTask add new task with name, time and func
|
// NewTask add new task with name, time and func
|
||||||
func NewTask(tname string, spec string, f TaskFunc) *Task {
|
func NewTask(tname string, spec string, f TaskFunc, opts ...Option) *Task {
|
||||||
|
|
||||||
task := &Task{
|
task := &Task{
|
||||||
Taskname: tname,
|
Taskname: tname,
|
||||||
@ -144,6 +146,11 @@ func NewTask(tname string, spec string, f TaskFunc) *Task {
|
|||||||
// we only store the pointer, so it won't use too many space
|
// we only store the pointer, so it won't use too many space
|
||||||
Errlist: make([]*taskerr, 100, 100),
|
Errlist: make([]*taskerr, 100, 100),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt.apply(task)
|
||||||
|
}
|
||||||
|
|
||||||
task.SetCron(spec)
|
task.SetCron(spec)
|
||||||
return task
|
return task
|
||||||
}
|
}
|
||||||
@ -196,6 +203,31 @@ func (t *Task) GetPrev(context.Context) time.Time {
|
|||||||
return t.Prev
|
return t.Prev
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetTimeout get timeout duration of this task
|
||||||
|
func (t *Task) GetTimeout(context.Context) time.Duration {
|
||||||
|
return t.Timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option interface
|
||||||
|
type Option interface {
|
||||||
|
apply(*Task)
|
||||||
|
}
|
||||||
|
|
||||||
|
// optionFunc return a function to set task element
|
||||||
|
type optionFunc func(*Task)
|
||||||
|
|
||||||
|
// apply option to task
|
||||||
|
func (f optionFunc) apply(t *Task) {
|
||||||
|
f(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeoutOption return a option to set timeout duration for task
|
||||||
|
func TimeoutOption(timeout time.Duration) Option {
|
||||||
|
return optionFunc(func(t *Task) {
|
||||||
|
t.Timeout = timeout
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// six columns mean:
|
// six columns mean:
|
||||||
// second:0-59
|
// second:0-59
|
||||||
// minute:0-59
|
// minute:0-59
|
||||||
@ -455,14 +487,12 @@ func (m *taskManager) StartTask() {
|
|||||||
|
|
||||||
func (m *taskManager) run() {
|
func (m *taskManager) run() {
|
||||||
now := time.Now().Local()
|
now := time.Now().Local()
|
||||||
m.taskLock.Lock()
|
// first run the tasks, so set all tasks next run time.
|
||||||
for _, t := range m.adminTaskList {
|
m.setTasksStartTime(now)
|
||||||
t.SetNext(nil, now)
|
|
||||||
}
|
|
||||||
m.taskLock.Unlock()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// we only use RLock here because NewMapSorter copy the reference, do not change any thing
|
// we only use RLock here because NewMapSorter copy the reference, do not change any thing
|
||||||
|
// here, we sort all task and get first task running time (effective).
|
||||||
m.taskLock.RLock()
|
m.taskLock.RLock()
|
||||||
sortList := NewMapSorter(m.adminTaskList)
|
sortList := NewMapSorter(m.adminTaskList)
|
||||||
m.taskLock.RUnlock()
|
m.taskLock.RUnlock()
|
||||||
@ -475,37 +505,75 @@ func (m *taskManager) run() {
|
|||||||
} else {
|
} else {
|
||||||
effective = sortList.Vals[0].GetNext(context.Background())
|
effective = sortList.Vals[0].GetNext(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case now = <-time.After(effective.Sub(now)):
|
case now = <-time.After(effective.Sub(now)): // wait for effective time
|
||||||
// Run every entry whose next time was this effective time.
|
runNextTasks(sortList, effective)
|
||||||
for _, e := range sortList.Vals {
|
|
||||||
if e.GetNext(context.Background()) != effective {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
go e.Run(nil)
|
|
||||||
e.SetPrev(context.Background(), e.GetNext(context.Background()))
|
|
||||||
e.SetNext(nil, effective)
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
case <-m.changed:
|
case <-m.changed: // tasks have been changed, set all tasks run again now
|
||||||
now = time.Now().Local()
|
now = time.Now().Local()
|
||||||
m.taskLock.Lock()
|
m.setTasksStartTime(now)
|
||||||
for _, t := range m.adminTaskList {
|
|
||||||
t.SetNext(nil, now)
|
|
||||||
}
|
|
||||||
m.taskLock.Unlock()
|
|
||||||
continue
|
continue
|
||||||
case <-m.stop:
|
case <-m.stop: // manager is stopped, and mark manager is stopped
|
||||||
m.taskLock.Lock()
|
m.markManagerStop()
|
||||||
if m.started {
|
|
||||||
m.started = false
|
|
||||||
}
|
|
||||||
m.taskLock.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setTasksStartTime is set all tasks next running time
|
||||||
|
func (m *taskManager) setTasksStartTime(now time.Time) {
|
||||||
|
m.taskLock.Lock()
|
||||||
|
for _, task := range m.adminTaskList {
|
||||||
|
task.SetNext(context.Background(), now)
|
||||||
|
}
|
||||||
|
m.taskLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// markManagerStop it sets manager to be stopped
|
||||||
|
func (m *taskManager) markManagerStop() {
|
||||||
|
m.taskLock.Lock()
|
||||||
|
if m.started {
|
||||||
|
m.started = false
|
||||||
|
}
|
||||||
|
m.taskLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// runNextTasks it runs next task which next run time is equal to effective
|
||||||
|
func runNextTasks(sortList *MapSorter, effective time.Time) {
|
||||||
|
// Run every entry whose next time was this effective time.
|
||||||
|
var i = 0
|
||||||
|
for _, e := range sortList.Vals {
|
||||||
|
i++
|
||||||
|
if e.GetNext(context.Background()) != effective {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if timeout is on, if yes passing the timeout context
|
||||||
|
ctx := context.Background()
|
||||||
|
if duration := e.GetTimeout(ctx); duration != 0 {
|
||||||
|
go func(e Tasker) {
|
||||||
|
ctx, cancelFunc := context.WithTimeout(ctx, duration)
|
||||||
|
defer cancelFunc()
|
||||||
|
err := e.Run(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("tasker.run err: %s\n", err.Error())
|
||||||
|
}
|
||||||
|
}(e)
|
||||||
|
} else {
|
||||||
|
go func(e Tasker) {
|
||||||
|
err := e.Run(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("tasker.run err: %s\n", err.Error())
|
||||||
|
}
|
||||||
|
}(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.SetPrev(context.Background(), e.GetNext(context.Background()))
|
||||||
|
e.SetNext(context.Background(), effective)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// StopTask stop all tasks
|
// StopTask stop all tasks
|
||||||
func (m *taskManager) StopTask() {
|
func (m *taskManager) StopTask() {
|
||||||
go func() {
|
go func() {
|
||||||
|
|||||||
@ -90,6 +90,57 @@ func TestSpec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTimeout(t *testing.T) {
|
||||||
|
m := newTaskManager()
|
||||||
|
defer m.ClearTask()
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
once1, once2 := sync.Once{}, sync.Once{}
|
||||||
|
|
||||||
|
tk1 := NewTask("tk1", "0/10 * * ? * *",
|
||||||
|
func(ctx context.Context) error {
|
||||||
|
time.Sleep(4 * time.Second)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
once1.Do(func() {
|
||||||
|
fmt.Println("tk1 done")
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
return errors.New("timeout")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, TimeoutOption(3*time.Second),
|
||||||
|
)
|
||||||
|
|
||||||
|
tk2 := NewTask("tk2", "0/11 * * ? * *",
|
||||||
|
func(ctx context.Context) error {
|
||||||
|
time.Sleep(4 * time.Second)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.New("timeout")
|
||||||
|
default:
|
||||||
|
once2.Do(func() {
|
||||||
|
fmt.Println("tk2 done")
|
||||||
|
wg.Done()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
m.AddTask("tk1", tk1)
|
||||||
|
m.AddTask("tk2", tk2)
|
||||||
|
m.StartTask()
|
||||||
|
defer m.StopTask()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(19 * time.Second):
|
||||||
|
t.Error("TestTimeout failed")
|
||||||
|
case <-wait(wg):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTask_Run(t *testing.T) {
|
func TestTask_Run(t *testing.T) {
|
||||||
cnt := -1
|
cnt := -1
|
||||||
task := func(ctx context.Context) error {
|
task := func(ctx context.Context) error {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user