refactor code and fix bug

This commit is contained in:
Jason li 2021-01-14 15:37:36 +08:00
parent 6464b500f1
commit 194de55058
2 changed files with 66 additions and 46 deletions

View File

@ -487,14 +487,12 @@ func (m *taskManager) StartTask() {
func (m *taskManager) run() { func (m *taskManager) run() {
now := time.Now().Local() now := time.Now().Local()
m.taskLock.Lock() // first run the tasks, so set all tasks next run time.
for _, t := range m.adminTaskList { m.setTasksStartTime(now)
t.SetNext(nil, now)
}
m.taskLock.Unlock()
for { for {
// we only use RLock here because NewMapSorter copy the reference, do not change any thing // we only use RLock here because NewMapSorter copy the reference, do not change any thing
// here, we sort all task and get first task running time (effective).
m.taskLock.RLock() m.taskLock.RLock()
sortList := NewMapSorter(m.adminTaskList) sortList := NewMapSorter(m.adminTaskList)
m.taskLock.RUnlock() m.taskLock.RUnlock()
@ -507,10 +505,46 @@ func (m *taskManager) run() {
} else { } else {
effective = sortList.Vals[0].GetNext(context.Background()) effective = sortList.Vals[0].GetNext(context.Background())
} }
select { select {
case now = <-time.After(effective.Sub(now)): case now = <-time.After(effective.Sub(now)): // wait for effective time
runNextTasks(sortList, effective)
continue
case <-m.changed: // tasks have been changed, set all tasks run again now
now = time.Now().Local()
m.setTasksStartTime(now)
continue
case <-m.stop: // manager is stopped, and mark manager is stopped
m.markManagerStop()
return
}
}
}
// setTasksStartTime is set all tasks next running time
func (m *taskManager) setTasksStartTime(now time.Time) {
m.taskLock.Lock()
for _, task := range m.adminTaskList {
task.SetNext(nil, now)
}
m.taskLock.Unlock()
}
// markManagerStop it sets manager to be stopped
func (m *taskManager) markManagerStop() {
m.taskLock.Lock()
if m.started {
m.started = false
}
m.taskLock.Unlock()
}
// runNextTasks it runs next task which next run time is equal to effective
func runNextTasks(sortList *MapSorter, effective time.Time) *MapSorter {
// Run every entry whose next time was this effective time. // Run every entry whose next time was this effective time.
var i = 0
for _, e := range sortList.Vals { for _, e := range sortList.Vals {
i++
if e.GetNext(context.Background()) != effective { if e.GetNext(context.Background()) != effective {
break break
} }
@ -518,11 +552,11 @@ func (m *taskManager) run() {
// check if timeout is on, if yes passing the timeout context // check if timeout is on, if yes passing the timeout context
ctx := context.Background() ctx := context.Background()
if duration := e.GetTimeout(ctx); duration != 0 { if duration := e.GetTimeout(ctx); duration != 0 {
go func(e Tasker) {
ctx, cancelFunc := context.WithTimeout(ctx, duration) ctx, cancelFunc := context.WithTimeout(ctx, duration)
go func() {
defer cancelFunc() defer cancelFunc()
e.Run(ctx) e.Run(ctx)
}() }(e)
} else { } else {
go e.Run(ctx) go e.Run(ctx)
} }
@ -530,24 +564,7 @@ func (m *taskManager) run() {
e.SetPrev(context.Background(), e.GetNext(context.Background())) e.SetPrev(context.Background(), e.GetNext(context.Background()))
e.SetNext(nil, effective) e.SetNext(nil, effective)
} }
continue return sortList
case <-m.changed:
now = time.Now().Local()
m.taskLock.Lock()
for _, t := range m.adminTaskList {
t.SetNext(nil, now)
}
m.taskLock.Unlock()
continue
case <-m.stop:
m.taskLock.Lock()
if m.started {
m.started = false
}
m.taskLock.Unlock()
return
}
}
} }
// StopTask stop all tasks // StopTask stop all tasks

View File

@ -95,15 +95,17 @@ func TestTimeout(t *testing.T) {
defer m.ClearTask() defer m.ClearTask()
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
once1, once2 := sync.Once{}, sync.Once{}
tk1 := NewTask("tk1", "0/10 * * ? * *", tk1 := NewTask("tk1", "0/10 * * ? * *",
func(ctx context.Context) error { func(ctx context.Context) error {
fmt.Println("tk1 start")
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
select { select {
case <-ctx.Done(): case <-ctx.Done():
once1.Do(func() {
fmt.Println("tk1 done") fmt.Println("tk1 done")
wg.Done() wg.Done()
})
return errors.New("timeout") return errors.New("timeout")
default: default:
} }
@ -111,16 +113,17 @@ func TestTimeout(t *testing.T) {
}, TimeoutOption(3*time.Second), }, TimeoutOption(3*time.Second),
) )
tk2 := NewTask("tk2", "0/10 * * ? * *", tk2 := NewTask("tk2", "0/11 * * ? * *",
func(ctx context.Context) error { func(ctx context.Context) error {
fmt.Println("tk2 start")
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return errors.New("timeout") return errors.New("timeout")
default: default:
once2.Do(func() {
fmt.Println("tk2 done") fmt.Println("tk2 done")
wg.Done() wg.Done()
})
} }
return nil return nil
}, },