Merge pull request #4635 from optimistic9527/develop

task manager graceful shutdown support
This commit is contained in:
Ming Deng 2021-05-27 14:09:34 +08:00 committed by GitHub
commit 2f174db197
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 2 deletions

View File

@ -49,6 +49,7 @@
- Optimize AddAutoPrefix: only register one router in case-insensitive mode. [4582](https://github.com/beego/beego/pull/4582)
- Init exceptMethod by using reflection. [4583](https://github.com/beego/beego/pull/4583)
- Deprecated BeeMap and replace all usage with `sync.map` [4616](https://github.com/beego/beego/pull/4616)
- TaskManager support graceful shutdown [4635](https://github.com/beego/beego/pull/4635)
## Fix Sonar

View File

@ -37,6 +37,7 @@ type taskManager struct {
stop chan bool
changed chan bool
started bool
wait sync.WaitGroup
}
func newTaskManager() *taskManager {
@ -471,6 +472,11 @@ func ClearTask() {
globalTaskManager.ClearTask()
}
// GracefulShutdown wait all task done
func GracefulShutdown() <-chan struct{} {
return globalTaskManager.GracefulShutdown()
}
// StartTask start all tasks
func (m *taskManager) StartTask() {
m.taskLock.Lock()
@ -508,7 +514,7 @@ func (m *taskManager) run() {
select {
case now = <-time.After(effective.Sub(now)): // wait for effective time
runNextTasks(sortList, effective)
m.runNextTasks(sortList, effective)
continue
case <-m.changed: // tasks have been changed, set all tasks run again now
now = time.Now().Local()
@ -540,7 +546,7 @@ func (m *taskManager) markManagerStop() {
}
// runNextTasks it runs next task which next run time is equal to effective
func runNextTasks(sortList *MapSorter, effective time.Time) {
func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) {
// Run every entry whose next time was this effective time.
var i = 0
for _, e := range sortList.Vals {
@ -551,8 +557,10 @@ func runNextTasks(sortList *MapSorter, effective time.Time) {
// check if timeout is on, if yes passing the timeout context
ctx := context.Background()
m.wait.Add(1)
if duration := e.GetTimeout(ctx); duration != 0 {
go func(e Tasker) {
defer m.wait.Done()
ctx, cancelFunc := context.WithTimeout(ctx, duration)
defer cancelFunc()
err := e.Run(ctx)
@ -562,6 +570,7 @@ func runNextTasks(sortList *MapSorter, effective time.Time) {
}(e)
} else {
go func(e Tasker) {
defer m.wait.Done()
err := e.Run(ctx)
if err != nil {
log.Printf("tasker.run err: %s\n", err.Error())
@ -581,6 +590,17 @@ func (m *taskManager) StopTask() {
}()
}
// GracefulShutdown wait all task done
func (m *taskManager) GracefulShutdown() <-chan struct{} {
done := make(chan struct{})
go func() {
m.stop <- true
m.wait.Wait()
close(done)
}()
return done
}
// AddTask add task with name
func (m *taskManager) AddTask(taskname string, t Tasker) {
isChanged := false

View File

@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
@ -177,6 +178,26 @@ func TestCrudTask(t *testing.T) {
assert.Equal(t, 0, len(m.adminTaskList))
}
func TestGracefulShutdown(t *testing.T) {
m := newTaskManager()
defer m.ClearTask()
waitDone := atomic.Value{}
waitDone.Store(false)
tk := NewTask("everySecond", "* * * * * *", func(ctx context.Context) error {
fmt.Println("hello world")
time.Sleep(2 * time.Second)
waitDone.Store(true)
return nil
})
m.AddTask("taska", tk)
m.StartTask()
time.Sleep(1 * time.Second)
shutdown := m.GracefulShutdown()
assert.False(t, waitDone.Load().(bool))
<-shutdown
assert.True(t, waitDone.Load().(bool))
}
func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {