From 686b73891e4f35b802599eae7d773145c68bccb2 Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 11:55:56 +0800 Subject: [PATCH 1/7] finish timeout option for task --- adapter/toolbox/task.go | 4 +++ task/governor_command_test.go | 4 +++ task/task.go | 48 +++++++++++++++++++++++++++++++++-- task/task_test.go | 48 +++++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/adapter/toolbox/task.go b/adapter/toolbox/task.go index bdd6679f..7b7cd68a 100644 --- a/adapter/toolbox/task.go +++ b/adapter/toolbox/task.go @@ -289,3 +289,7 @@ func (o *oldToNewAdapter) SetPrev(ctx context.Context, t time.Time) { func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time { return o.delegate.GetPrev() } + +func (o *oldToNewAdapter) GetTimeout(ctx context.Context) time.Duration { + return 0 +} diff --git a/task/governor_command_test.go b/task/governor_command_test.go index 00ed37f2..c3547cdf 100644 --- a/task/governor_command_test.go +++ b/task/governor_command_test.go @@ -55,6 +55,10 @@ func (c *countTask) GetPrev(ctx context.Context) time.Time { return time.Now() } +func (c *countTask) GetTimeout(ctx context.Context) time.Duration { + return 0 +} + func TestRunTaskCommand_Execute(t *testing.T) { task := &countTask{} AddTask("count", task) diff --git a/task/task.go b/task/task.go index 2ea34f24..d07f0135 100644 --- a/task/task.go +++ b/task/task.go @@ -109,6 +109,7 @@ type Tasker interface { GetNext(ctx context.Context) time.Time SetPrev(context.Context, time.Time) GetPrev(ctx context.Context) time.Time + GetTimeout(ctx context.Context) time.Duration } // task error @@ -127,13 +128,14 @@ type Task struct { DoFunc TaskFunc Prev time.Time Next time.Time + Timeout time.Duration Errlist []*taskerr // like errtime:errinfo ErrLimit int // max length for the errlist, 0 stand for no limit errCnt int // records the error count during the execution } // NewTask add new task with name, time and func -func NewTask(tname string, spec string, f TaskFunc) *Task { +func NewTask(tname string, spec string, f TaskFunc, opts ...Option) *Task { task := &Task{ Taskname: tname, @@ -144,6 +146,11 @@ func NewTask(tname string, spec string, f TaskFunc) *Task { // we only store the pointer, so it won't use too many space Errlist: make([]*taskerr, 100, 100), } + + for _, opt := range opts { + opt.apply(task) + } + task.SetCron(spec) return task } @@ -196,6 +203,31 @@ func (t *Task) GetPrev(context.Context) time.Time { return t.Prev } +// GetTimeout get timeout duration of this task +func (t *Task) GetTimeout(context.Context) time.Duration { + return t.Timeout +} + +// Option interface +type Option interface { + apply(*Task) +} + +// optionFunc return a function to set task element +type optionFunc func(*Task) + +// apply option to task +func (f optionFunc) apply(t *Task) { + f(t) +} + +// TimeoutOption return a option to set timeout duration for task +func TimeoutOption(timeout time.Duration) Option { + return optionFunc(func(t *Task) { + t.Timeout = timeout + }) +} + // six columns mean: // second:0-59 // minute:0-59 @@ -482,7 +514,19 @@ func (m *taskManager) run() { if e.GetNext(context.Background()) != effective { break } - go e.Run(nil) + + // check if timeout is on, if yes passing the timeout context + ctx := context.Background() + if duration := e.GetTimeout(ctx); duration != 0 { + ctx, cancelFunc := context.WithTimeout(ctx, duration) + go func() { + defer cancelFunc() + e.Run(ctx) + }() + } else { + go e.Run(ctx) + } + e.SetPrev(context.Background(), e.GetNext(context.Background())) e.SetNext(nil, effective) } diff --git a/task/task_test.go b/task/task_test.go index c87757ef..d36c3994 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -90,6 +90,54 @@ func TestSpec(t *testing.T) { } } +func TestTimeout(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() + wg := &sync.WaitGroup{} + wg.Add(2) + + tk1 := NewTask("tk1", "0/10 * * ? * *", + func(ctx context.Context) error { + fmt.Println("tk1 start") + time.Sleep(4 * time.Second) + select { + case <-ctx.Done(): + fmt.Println("tk1 done") + wg.Done() + return errors.New("timeout") + default: + } + return nil + }, TimeoutOption(3*time.Second), + ) + + tk2 := NewTask("tk2", "0/10 * * ? * *", + func(ctx context.Context) error { + fmt.Println("tk2 start") + time.Sleep(4 * time.Second) + select { + case <-ctx.Done(): + return errors.New("timeout") + default: + fmt.Println("tk2 done") + wg.Done() + } + return nil + }, + ) + + m.AddTask("tk1", tk1) + m.AddTask("tk2", tk2) + m.StartTask() + defer m.StopTask() + + select { + case <-time.After(19 * time.Second): + t.Error("TestTimeout failed") + case <-wait(wg): + } +} + func TestTask_Run(t *testing.T) { cnt := -1 task := func(ctx context.Context) error { From f4a829fbf6a342bff89023537fcc00fd29d39383 Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 11:59:02 +0800 Subject: [PATCH 2/7] add comment --- task/task.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/task/task.go b/task/task.go index d07f0135..282eb47d 100644 --- a/task/task.go +++ b/task/task.go @@ -128,10 +128,10 @@ type Task struct { DoFunc TaskFunc Prev time.Time Next time.Time - Timeout time.Duration - Errlist []*taskerr // like errtime:errinfo - ErrLimit int // max length for the errlist, 0 stand for no limit - errCnt int // records the error count during the execution + Timeout time.Duration // timeout duration + Errlist []*taskerr // like errtime:errinfo + ErrLimit int // max length for the errlist, 0 stand for no limit + errCnt int // records the error count during the execution } // NewTask add new task with name, time and func From 6464b500f12a022d1fac16ab0eae8333a18e881c Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 12:01:55 +0800 Subject: [PATCH 3/7] add change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f79a8f6f..0e2612b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,3 +11,4 @@ - Support session Filter chain. [4404](https://github.com/beego/beego/pull/4404) - Feature issue #4402 finish router get example. [4416](https://github.com/beego/beego/pull/4416) - Implement context.Context support and deprecate `QueryM2MWithCtx` and `QueryTableWithCtx` [4424](https://github.com/beego/beego/pull/4424) +- Finish timeout option for tasks #4441 [4441](https://github.com/beego/beego/pull/4441) \ No newline at end of file From 194de5505831456861370be5964a3d53608503e0 Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 15:37:36 +0800 Subject: [PATCH 4/7] refactor code and fix bug --- task/task.go | 95 ++++++++++++++++++++++++++++------------------- task/task_test.go | 17 +++++---- 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/task/task.go b/task/task.go index 282eb47d..9233cff5 100644 --- a/task/task.go +++ b/task/task.go @@ -487,14 +487,12 @@ func (m *taskManager) StartTask() { func (m *taskManager) run() { now := time.Now().Local() - m.taskLock.Lock() - for _, t := range m.adminTaskList { - t.SetNext(nil, now) - } - m.taskLock.Unlock() + // first run the tasks, so set all tasks next run time. + m.setTasksStartTime(now) for { // we only use RLock here because NewMapSorter copy the reference, do not change any thing + // here, we sort all task and get first task running time (effective). m.taskLock.RLock() sortList := NewMapSorter(m.adminTaskList) m.taskLock.RUnlock() @@ -507,49 +505,68 @@ func (m *taskManager) run() { } else { effective = sortList.Vals[0].GetNext(context.Background()) } + select { - case now = <-time.After(effective.Sub(now)): - // Run every entry whose next time was this effective time. - for _, e := range sortList.Vals { - if e.GetNext(context.Background()) != effective { - break - } - - // check if timeout is on, if yes passing the timeout context - ctx := context.Background() - if duration := e.GetTimeout(ctx); duration != 0 { - ctx, cancelFunc := context.WithTimeout(ctx, duration) - go func() { - defer cancelFunc() - e.Run(ctx) - }() - } else { - go e.Run(ctx) - } - - e.SetPrev(context.Background(), e.GetNext(context.Background())) - e.SetNext(nil, effective) - } + case now = <-time.After(effective.Sub(now)): // wait for effective time + runNextTasks(sortList, effective) continue - case <-m.changed: + case <-m.changed: // tasks have been changed, set all tasks run again now now = time.Now().Local() - m.taskLock.Lock() - for _, t := range m.adminTaskList { - t.SetNext(nil, now) - } - m.taskLock.Unlock() + m.setTasksStartTime(now) continue - case <-m.stop: - m.taskLock.Lock() - if m.started { - m.started = false - } - m.taskLock.Unlock() + case <-m.stop: // manager is stopped, and mark manager is stopped + m.markManagerStop() return } } } +// setTasksStartTime is set all tasks next running time +func (m *taskManager) setTasksStartTime(now time.Time) { + m.taskLock.Lock() + for _, task := range m.adminTaskList { + task.SetNext(nil, now) + } + m.taskLock.Unlock() +} + +// markManagerStop it sets manager to be stopped +func (m *taskManager) markManagerStop() { + m.taskLock.Lock() + if m.started { + m.started = false + } + m.taskLock.Unlock() +} + +// runNextTasks it runs next task which next run time is equal to effective +func runNextTasks(sortList *MapSorter, effective time.Time) *MapSorter { + // Run every entry whose next time was this effective time. + var i = 0 + for _, e := range sortList.Vals { + i++ + if e.GetNext(context.Background()) != effective { + break + } + + // check if timeout is on, if yes passing the timeout context + ctx := context.Background() + if duration := e.GetTimeout(ctx); duration != 0 { + go func(e Tasker) { + ctx, cancelFunc := context.WithTimeout(ctx, duration) + defer cancelFunc() + e.Run(ctx) + }(e) + } else { + go e.Run(ctx) + } + + e.SetPrev(context.Background(), e.GetNext(context.Background())) + e.SetNext(nil, effective) + } + return sortList +} + // StopTask stop all tasks func (m *taskManager) StopTask() { go func() { diff --git a/task/task_test.go b/task/task_test.go index d36c3994..1078aa01 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -95,15 +95,17 @@ func TestTimeout(t *testing.T) { defer m.ClearTask() wg := &sync.WaitGroup{} wg.Add(2) + once1, once2 := sync.Once{}, sync.Once{} tk1 := NewTask("tk1", "0/10 * * ? * *", func(ctx context.Context) error { - fmt.Println("tk1 start") time.Sleep(4 * time.Second) select { case <-ctx.Done(): - fmt.Println("tk1 done") - wg.Done() + once1.Do(func() { + fmt.Println("tk1 done") + wg.Done() + }) return errors.New("timeout") default: } @@ -111,16 +113,17 @@ func TestTimeout(t *testing.T) { }, TimeoutOption(3*time.Second), ) - tk2 := NewTask("tk2", "0/10 * * ? * *", + tk2 := NewTask("tk2", "0/11 * * ? * *", func(ctx context.Context) error { - fmt.Println("tk2 start") time.Sleep(4 * time.Second) select { case <-ctx.Done(): return errors.New("timeout") default: - fmt.Println("tk2 done") - wg.Done() + once2.Do(func() { + fmt.Println("tk2 done") + wg.Done() + }) } return nil }, From 9e729668b1e184d4a793bea86465623326649d78 Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 15:39:16 +0800 Subject: [PATCH 5/7] delete return sortlist --- task/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index 9233cff5..2904ed97 100644 --- a/task/task.go +++ b/task/task.go @@ -540,7 +540,7 @@ func (m *taskManager) markManagerStop() { } // runNextTasks it runs next task which next run time is equal to effective -func runNextTasks(sortList *MapSorter, effective time.Time) *MapSorter { +func runNextTasks(sortList *MapSorter, effective time.Time) { // Run every entry whose next time was this effective time. var i = 0 for _, e := range sortList.Vals { @@ -564,7 +564,6 @@ func runNextTasks(sortList *MapSorter, effective time.Time) *MapSorter { e.SetPrev(context.Background(), e.GetNext(context.Background())) e.SetNext(nil, effective) } - return sortList } // StopTask stop all tasks From 9516caa32a037c87aeec8bbfc875aad05633ecd9 Mon Sep 17 00:00:00 2001 From: Jason li Date: Thu, 14 Jan 2021 15:42:37 +0800 Subject: [PATCH 6/7] fix context nil lint --- task/task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index 2904ed97..acf42ae4 100644 --- a/task/task.go +++ b/task/task.go @@ -525,7 +525,7 @@ func (m *taskManager) run() { func (m *taskManager) setTasksStartTime(now time.Time) { m.taskLock.Lock() for _, task := range m.adminTaskList { - task.SetNext(nil, now) + task.SetNext(context.Background(), now) } m.taskLock.Unlock() } @@ -562,7 +562,7 @@ func runNextTasks(sortList *MapSorter, effective time.Time) { } e.SetPrev(context.Background(), e.GetNext(context.Background())) - e.SetNext(nil, effective) + e.SetNext(context.Background(), effective) } } From 09f349f716b2e94a2ac4a724ca4f90c09ea1b628 Mon Sep 17 00:00:00 2001 From: Jason li Date: Fri, 15 Jan 2021 11:35:10 +0800 Subject: [PATCH 7/7] add err check for task running --- task/task.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index acf42ae4..00e67c4b 100644 --- a/task/task.go +++ b/task/task.go @@ -555,10 +555,18 @@ func runNextTasks(sortList *MapSorter, effective time.Time) { go func(e Tasker) { ctx, cancelFunc := context.WithTimeout(ctx, duration) defer cancelFunc() - e.Run(ctx) + err := e.Run(ctx) + if err != nil { + log.Printf("tasker.run err: %s\n", err.Error()) + } }(e) } else { - go e.Run(ctx) + go func(e Tasker) { + err := e.Run(ctx) + if err != nil { + log.Printf("tasker.run err: %s\n", err.Error()) + } + }(e) } e.SetPrev(context.Background(), e.GetNext(context.Background()))