diff --git a/task/task.go b/task/task.go index 282eb47d..9233cff5 100644 --- a/task/task.go +++ b/task/task.go @@ -487,14 +487,12 @@ func (m *taskManager) StartTask() { func (m *taskManager) run() { now := time.Now().Local() - m.taskLock.Lock() - for _, t := range m.adminTaskList { - t.SetNext(nil, now) - } - m.taskLock.Unlock() + // first run the tasks, so set all tasks next run time. + m.setTasksStartTime(now) for { // we only use RLock here because NewMapSorter copy the reference, do not change any thing + // here, we sort all task and get first task running time (effective). m.taskLock.RLock() sortList := NewMapSorter(m.adminTaskList) m.taskLock.RUnlock() @@ -507,49 +505,68 @@ func (m *taskManager) run() { } else { effective = sortList.Vals[0].GetNext(context.Background()) } + select { - case now = <-time.After(effective.Sub(now)): - // Run every entry whose next time was this effective time. - for _, e := range sortList.Vals { - if e.GetNext(context.Background()) != effective { - break - } - - // check if timeout is on, if yes passing the timeout context - ctx := context.Background() - if duration := e.GetTimeout(ctx); duration != 0 { - ctx, cancelFunc := context.WithTimeout(ctx, duration) - go func() { - defer cancelFunc() - e.Run(ctx) - }() - } else { - go e.Run(ctx) - } - - e.SetPrev(context.Background(), e.GetNext(context.Background())) - e.SetNext(nil, effective) - } + case now = <-time.After(effective.Sub(now)): // wait for effective time + runNextTasks(sortList, effective) continue - case <-m.changed: + case <-m.changed: // tasks have been changed, set all tasks run again now now = time.Now().Local() - m.taskLock.Lock() - for _, t := range m.adminTaskList { - t.SetNext(nil, now) - } - m.taskLock.Unlock() + m.setTasksStartTime(now) continue - case <-m.stop: - m.taskLock.Lock() - if m.started { - m.started = false - } - m.taskLock.Unlock() + case <-m.stop: // manager is stopped, and mark manager is stopped + m.markManagerStop() return } } } +// setTasksStartTime is set all tasks next running time +func (m *taskManager) setTasksStartTime(now time.Time) { + m.taskLock.Lock() + for _, task := range m.adminTaskList { + task.SetNext(nil, now) + } + m.taskLock.Unlock() +} + +// markManagerStop it sets manager to be stopped +func (m *taskManager) markManagerStop() { + m.taskLock.Lock() + if m.started { + m.started = false + } + m.taskLock.Unlock() +} + +// runNextTasks it runs next task which next run time is equal to effective +func runNextTasks(sortList *MapSorter, effective time.Time) *MapSorter { + // Run every entry whose next time was this effective time. + var i = 0 + for _, e := range sortList.Vals { + i++ + if e.GetNext(context.Background()) != effective { + break + } + + // check if timeout is on, if yes passing the timeout context + ctx := context.Background() + if duration := e.GetTimeout(ctx); duration != 0 { + go func(e Tasker) { + ctx, cancelFunc := context.WithTimeout(ctx, duration) + defer cancelFunc() + e.Run(ctx) + }(e) + } else { + go e.Run(ctx) + } + + e.SetPrev(context.Background(), e.GetNext(context.Background())) + e.SetNext(nil, effective) + } + return sortList +} + // StopTask stop all tasks func (m *taskManager) StopTask() { go func() { diff --git a/task/task_test.go b/task/task_test.go index d36c3994..1078aa01 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -95,15 +95,17 @@ func TestTimeout(t *testing.T) { defer m.ClearTask() wg := &sync.WaitGroup{} wg.Add(2) + once1, once2 := sync.Once{}, sync.Once{} tk1 := NewTask("tk1", "0/10 * * ? * *", func(ctx context.Context) error { - fmt.Println("tk1 start") time.Sleep(4 * time.Second) select { case <-ctx.Done(): - fmt.Println("tk1 done") - wg.Done() + once1.Do(func() { + fmt.Println("tk1 done") + wg.Done() + }) return errors.New("timeout") default: } @@ -111,16 +113,17 @@ func TestTimeout(t *testing.T) { }, TimeoutOption(3*time.Second), ) - tk2 := NewTask("tk2", "0/10 * * ? * *", + tk2 := NewTask("tk2", "0/11 * * ? * *", func(ctx context.Context) error { - fmt.Println("tk2 start") time.Sleep(4 * time.Second) select { case <-ctx.Done(): return errors.New("timeout") default: - fmt.Println("tk2 done") - wg.Done() + once2.Do(func() { + fmt.Println("tk2 done") + wg.Done() + }) } return nil },