From 9327e2b026b85144e68727115db821df6276fd58 Mon Sep 17 00:00:00 2001 From: guoxingyong Date: Wed, 26 May 2021 09:56:44 +0800 Subject: [PATCH 1/5] task manager graceful shutdown support https://github.com/beego/beego/issues/4631 --- task/task.go | 20 ++++++++++++++++++-- task/task_test.go | 21 +++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index 00e67c4b..29b570e9 100644 --- a/task/task.go +++ b/task/task.go @@ -37,6 +37,7 @@ type taskManager struct { stop chan bool changed chan bool started bool + wait sync.WaitGroup } func newTaskManager() *taskManager { @@ -508,7 +509,7 @@ func (m *taskManager) run() { select { case now = <-time.After(effective.Sub(now)): // wait for effective time - runNextTasks(sortList, effective) + m.runNextTasks(sortList, effective) continue case <-m.changed: // tasks have been changed, set all tasks run again now now = time.Now().Local() @@ -540,7 +541,7 @@ func (m *taskManager) markManagerStop() { } // runNextTasks it runs next task which next run time is equal to effective -func runNextTasks(sortList *MapSorter, effective time.Time) { +func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) { // Run every entry whose next time was this effective time. var i = 0 for _, e := range sortList.Vals { @@ -553,19 +554,23 @@ func runNextTasks(sortList *MapSorter, effective time.Time) { ctx := context.Background() if duration := e.GetTimeout(ctx); duration != 0 { go func(e Tasker) { + m.wait.Add(1) ctx, cancelFunc := context.WithTimeout(ctx, duration) defer cancelFunc() err := e.Run(ctx) if err != nil { log.Printf("tasker.run err: %s\n", err.Error()) } + m.wait.Done() }(e) } else { go func(e Tasker) { + m.wait.Add(1) err := e.Run(ctx) if err != nil { log.Printf("tasker.run err: %s\n", err.Error()) } + m.wait.Done() }(e) } @@ -581,6 +586,17 @@ func (m *taskManager) StopTask() { }() } +// StopTask stop all tasks +func (m *taskManager) GracefulShutdown() <-chan struct{} { + done := make(chan struct{}, 0) + go func() { + m.stop <- true + m.wait.Wait() + close(done) + }() + return done +} + // AddTask add task with name func (m *taskManager) AddTask(taskname string, t Tasker) { isChanged := false diff --git a/task/task_test.go b/task/task_test.go index 1078aa01..8d274e8f 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -177,6 +178,26 @@ func TestCrudTask(t *testing.T) { assert.Equal(t, 0, len(m.adminTaskList)) } +func TestGracefulShutdown(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() + waitDone := atomic.Value{} + waitDone.Store(false) + tk := NewTask("everySecond", "* * * * * *", func(ctx context.Context) error { + fmt.Println("hello world") + time.Sleep(2 * time.Second) + waitDone.Store(true) + return nil + }) + m.AddTask("taska", tk) + m.StartTask() + time.Sleep(1 * time.Second) + shutdown := m.GracefulShutdown() + assert.False(t, waitDone.Load().(bool)) + <-shutdown + assert.True(t, waitDone.Load().(bool)) +} + func wait(wg *sync.WaitGroup) chan bool { ch := make(chan bool) go func() { From 93b73ddb34bdf9371b080f1649c5658aa5492bbf Mon Sep 17 00:00:00 2001 From: guoxingyong Date: Wed, 26 May 2021 10:19:03 +0800 Subject: [PATCH 2/5] task manager graceful shutdown support --- task/task.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/task/task.go b/task/task.go index 29b570e9..b4478cc4 100644 --- a/task/task.go +++ b/task/task.go @@ -472,6 +472,11 @@ func ClearTask() { globalTaskManager.ClearTask() } +// GracefulShutdown wait all task done +func GracefulShutdown() <-chan struct{} { + return globalTaskManager.GracefulShutdown() +} + // StartTask start all tasks func (m *taskManager) StartTask() { m.taskLock.Lock() @@ -586,7 +591,7 @@ func (m *taskManager) StopTask() { }() } -// StopTask stop all tasks +// GracefulShutdown wait all task done func (m *taskManager) GracefulShutdown() <-chan struct{} { done := make(chan struct{}, 0) go func() { From 8a193c5004475d7ea595120d5937ad4931ef2470 Mon Sep 17 00:00:00 2001 From: guoxingyong Date: Wed, 26 May 2021 10:34:11 +0800 Subject: [PATCH 3/5] task manager graceful shutdown support --- task/task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task/task.go b/task/task.go index b4478cc4..fd3c6e28 100644 --- a/task/task.go +++ b/task/task.go @@ -560,22 +560,22 @@ func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) { if duration := e.GetTimeout(ctx); duration != 0 { go func(e Tasker) { m.wait.Add(1) + defer m.wait.Done() ctx, cancelFunc := context.WithTimeout(ctx, duration) defer cancelFunc() err := e.Run(ctx) if err != nil { log.Printf("tasker.run err: %s\n", err.Error()) } - m.wait.Done() }(e) } else { go func(e Tasker) { m.wait.Add(1) + defer m.wait.Done() err := e.Run(ctx) if err != nil { log.Printf("tasker.run err: %s\n", err.Error()) } - m.wait.Done() }(e) } From 7419ad952db027e4fa4d1449560ec3270b57bab6 Mon Sep 17 00:00:00 2001 From: guoxingyong Date: Wed, 26 May 2021 10:37:16 +0800 Subject: [PATCH 4/5] task manager graceful shutdown support --- task/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task/task.go b/task/task.go index fd3c6e28..8576566c 100644 --- a/task/task.go +++ b/task/task.go @@ -593,7 +593,7 @@ func (m *taskManager) StopTask() { // GracefulShutdown wait all task done func (m *taskManager) GracefulShutdown() <-chan struct{} { - done := make(chan struct{}, 0) + done := make(chan struct{}) go func() { m.stop <- true m.wait.Wait() From 003434cad6f2c47e50eb791ce1e62c8bdac29905 Mon Sep 17 00:00:00 2001 From: guoxingyong Date: Wed, 26 May 2021 13:37:09 +0800 Subject: [PATCH 5/5] fix DeepSource and CHANGELOG.md --- CHANGELOG.md | 1 + task/task.go | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97fa7111..94a511c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ - Optimize AddAutoPrefix: only register one router in case-insensitive mode. [4582](https://github.com/beego/beego/pull/4582) - Init exceptMethod by using reflection. [4583](https://github.com/beego/beego/pull/4583) - Deprecated BeeMap and replace all usage with `sync.map` [4616](https://github.com/beego/beego/pull/4616) +- TaskManager support graceful shutdown [4635](https://github.com/beego/beego/pull/4635) ## Fix Sonar diff --git a/task/task.go b/task/task.go index 8576566c..2bec0cc7 100644 --- a/task/task.go +++ b/task/task.go @@ -557,9 +557,9 @@ func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) { // check if timeout is on, if yes passing the timeout context ctx := context.Background() + m.wait.Add(1) if duration := e.GetTimeout(ctx); duration != 0 { go func(e Tasker) { - m.wait.Add(1) defer m.wait.Done() ctx, cancelFunc := context.WithTimeout(ctx, duration) defer cancelFunc() @@ -570,7 +570,6 @@ func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) { }(e) } else { go func(e Tasker) { - m.wait.Add(1) defer m.wait.Done() err := e.Run(ctx) if err != nil {