diff --git a/CHANGELOG.md b/CHANGELOG.md index 87a34e0b..8750999c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ - Optimize AddAutoPrefix: only register one router in case-insensitive mode. [4582](https://github.com/beego/beego/pull/4582) - Init exceptMethod by using reflection. [4583](https://github.com/beego/beego/pull/4583) - Deprecated BeeMap and replace all usage with `sync.map` [4616](https://github.com/beego/beego/pull/4616) +- TaskManager support graceful shutdown [4635](https://github.com/beego/beego/pull/4635) ## Fix Sonar diff --git a/task/task.go b/task/task.go index 00e67c4b..2bec0cc7 100644 --- a/task/task.go +++ b/task/task.go @@ -37,6 +37,7 @@ type taskManager struct { stop chan bool changed chan bool started bool + wait sync.WaitGroup } func newTaskManager() *taskManager { @@ -471,6 +472,11 @@ func ClearTask() { globalTaskManager.ClearTask() } +// GracefulShutdown wait all task done +func GracefulShutdown() <-chan struct{} { + return globalTaskManager.GracefulShutdown() +} + // StartTask start all tasks func (m *taskManager) StartTask() { m.taskLock.Lock() @@ -508,7 +514,7 @@ func (m *taskManager) run() { select { case now = <-time.After(effective.Sub(now)): // wait for effective time - runNextTasks(sortList, effective) + m.runNextTasks(sortList, effective) continue case <-m.changed: // tasks have been changed, set all tasks run again now now = time.Now().Local() @@ -540,7 +546,7 @@ func (m *taskManager) markManagerStop() { } // runNextTasks it runs next task which next run time is equal to effective -func runNextTasks(sortList *MapSorter, effective time.Time) { +func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) { // Run every entry whose next time was this effective time. var i = 0 for _, e := range sortList.Vals { @@ -551,8 +557,10 @@ func runNextTasks(sortList *MapSorter, effective time.Time) { // check if timeout is on, if yes passing the timeout context ctx := context.Background() + m.wait.Add(1) if duration := e.GetTimeout(ctx); duration != 0 { go func(e Tasker) { + defer m.wait.Done() ctx, cancelFunc := context.WithTimeout(ctx, duration) defer cancelFunc() err := e.Run(ctx) @@ -562,6 +570,7 @@ func runNextTasks(sortList *MapSorter, effective time.Time) { }(e) } else { go func(e Tasker) { + defer m.wait.Done() err := e.Run(ctx) if err != nil { log.Printf("tasker.run err: %s\n", err.Error()) @@ -581,6 +590,17 @@ func (m *taskManager) StopTask() { }() } +// GracefulShutdown wait all task done +func (m *taskManager) GracefulShutdown() <-chan struct{} { + done := make(chan struct{}) + go func() { + m.stop <- true + m.wait.Wait() + close(done) + }() + return done +} + // AddTask add task with name func (m *taskManager) AddTask(taskname string, t Tasker) { isChanged := false diff --git a/task/task_test.go b/task/task_test.go index 1078aa01..8d274e8f 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -177,6 +178,26 @@ func TestCrudTask(t *testing.T) { assert.Equal(t, 0, len(m.adminTaskList)) } +func TestGracefulShutdown(t *testing.T) { + m := newTaskManager() + defer m.ClearTask() + waitDone := atomic.Value{} + waitDone.Store(false) + tk := NewTask("everySecond", "* * * * * *", func(ctx context.Context) error { + fmt.Println("hello world") + time.Sleep(2 * time.Second) + waitDone.Store(true) + return nil + }) + m.AddTask("taska", tk) + m.StartTask() + time.Sleep(1 * time.Second) + shutdown := m.GracefulShutdown() + assert.False(t, waitDone.Load().(bool)) + <-shutdown + assert.True(t, waitDone.Load().(bool)) +} + func wait(wg *sync.WaitGroup) chan bool { ch := make(chan bool) go func() {