finish timeout option for task

This commit is contained in:
Jason li 2021-01-14 11:55:56 +08:00
parent 9860e0a618
commit 686b73891e
4 changed files with 102 additions and 2 deletions

View File

@ -289,3 +289,7 @@ func (o *oldToNewAdapter) SetPrev(ctx context.Context, t time.Time) {
func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time { func (o *oldToNewAdapter) GetPrev(ctx context.Context) time.Time {
return o.delegate.GetPrev() return o.delegate.GetPrev()
} }
func (o *oldToNewAdapter) GetTimeout(ctx context.Context) time.Duration {
return 0
}

View File

@ -55,6 +55,10 @@ func (c *countTask) GetPrev(ctx context.Context) time.Time {
return time.Now() return time.Now()
} }
func (c *countTask) GetTimeout(ctx context.Context) time.Duration {
return 0
}
func TestRunTaskCommand_Execute(t *testing.T) { func TestRunTaskCommand_Execute(t *testing.T) {
task := &countTask{} task := &countTask{}
AddTask("count", task) AddTask("count", task)

View File

@ -109,6 +109,7 @@ type Tasker interface {
GetNext(ctx context.Context) time.Time GetNext(ctx context.Context) time.Time
SetPrev(context.Context, time.Time) SetPrev(context.Context, time.Time)
GetPrev(ctx context.Context) time.Time GetPrev(ctx context.Context) time.Time
GetTimeout(ctx context.Context) time.Duration
} }
// task error // task error
@ -127,13 +128,14 @@ type Task struct {
DoFunc TaskFunc DoFunc TaskFunc
Prev time.Time Prev time.Time
Next time.Time Next time.Time
Timeout time.Duration
Errlist []*taskerr // like errtime:errinfo Errlist []*taskerr // like errtime:errinfo
ErrLimit int // max length for the errlist, 0 stand for no limit ErrLimit int // max length for the errlist, 0 stand for no limit
errCnt int // records the error count during the execution errCnt int // records the error count during the execution
} }
// NewTask add new task with name, time and func // NewTask add new task with name, time and func
func NewTask(tname string, spec string, f TaskFunc) *Task { func NewTask(tname string, spec string, f TaskFunc, opts ...Option) *Task {
task := &Task{ task := &Task{
Taskname: tname, Taskname: tname,
@ -144,6 +146,11 @@ func NewTask(tname string, spec string, f TaskFunc) *Task {
// we only store the pointer, so it won't use too many space // we only store the pointer, so it won't use too many space
Errlist: make([]*taskerr, 100, 100), Errlist: make([]*taskerr, 100, 100),
} }
for _, opt := range opts {
opt.apply(task)
}
task.SetCron(spec) task.SetCron(spec)
return task return task
} }
@ -196,6 +203,31 @@ func (t *Task) GetPrev(context.Context) time.Time {
return t.Prev return t.Prev
} }
// GetTimeout get timeout duration of this task
func (t *Task) GetTimeout(context.Context) time.Duration {
return t.Timeout
}
// Option interface
type Option interface {
apply(*Task)
}
// optionFunc return a function to set task element
type optionFunc func(*Task)
// apply option to task
func (f optionFunc) apply(t *Task) {
f(t)
}
// TimeoutOption return a option to set timeout duration for task
func TimeoutOption(timeout time.Duration) Option {
return optionFunc(func(t *Task) {
t.Timeout = timeout
})
}
// six columns mean // six columns mean
// second0-59 // second0-59
// minute0-59 // minute0-59
@ -482,7 +514,19 @@ func (m *taskManager) run() {
if e.GetNext(context.Background()) != effective { if e.GetNext(context.Background()) != effective {
break break
} }
go e.Run(nil)
// check if timeout is on, if yes passing the timeout context
ctx := context.Background()
if duration := e.GetTimeout(ctx); duration != 0 {
ctx, cancelFunc := context.WithTimeout(ctx, duration)
go func() {
defer cancelFunc()
e.Run(ctx)
}()
} else {
go e.Run(ctx)
}
e.SetPrev(context.Background(), e.GetNext(context.Background())) e.SetPrev(context.Background(), e.GetNext(context.Background()))
e.SetNext(nil, effective) e.SetNext(nil, effective)
} }

View File

@ -90,6 +90,54 @@ func TestSpec(t *testing.T) {
} }
} }
func TestTimeout(t *testing.T) {
m := newTaskManager()
defer m.ClearTask()
wg := &sync.WaitGroup{}
wg.Add(2)
tk1 := NewTask("tk1", "0/10 * * ? * *",
func(ctx context.Context) error {
fmt.Println("tk1 start")
time.Sleep(4 * time.Second)
select {
case <-ctx.Done():
fmt.Println("tk1 done")
wg.Done()
return errors.New("timeout")
default:
}
return nil
}, TimeoutOption(3*time.Second),
)
tk2 := NewTask("tk2", "0/10 * * ? * *",
func(ctx context.Context) error {
fmt.Println("tk2 start")
time.Sleep(4 * time.Second)
select {
case <-ctx.Done():
return errors.New("timeout")
default:
fmt.Println("tk2 done")
wg.Done()
}
return nil
},
)
m.AddTask("tk1", tk1)
m.AddTask("tk2", tk2)
m.StartTask()
defer m.StopTask()
select {
case <-time.After(19 * time.Second):
t.Error("TestTimeout failed")
case <-wait(wg):
}
}
func TestTask_Run(t *testing.T) { func TestTask_Run(t *testing.T) {
cnt := -1 cnt := -1
task := func(ctx context.Context) error { task := func(ctx context.Context) error {