188 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			188 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2014 beego Author. All Rights Reserved.
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package task
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/stretchr/testify/assert"
 | 
						|
)
 | 
						|
 | 
						|
func TestParse(t *testing.T) {
 | 
						|
	m := newTaskManager()
 | 
						|
	defer m.ClearTask()
 | 
						|
	tk := NewTask("taska", "0/30 * * * * *", func(ctx context.Context) error {
 | 
						|
		fmt.Println("hello world")
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
	err := tk.Run(nil)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
	assert.Equal(t, "0/30 * * * * *", tk.GetSpec(context.Background()))
 | 
						|
	m.AddTask("taska", tk)
 | 
						|
	m.StartTask()
 | 
						|
	time.Sleep(3 * time.Second)
 | 
						|
	assert.True(t, len(tk.GetStatus(context.Background())) == 0)
 | 
						|
	m.StopTask()
 | 
						|
}
 | 
						|
 | 
						|
func TestModifyTaskListAfterRunning(t *testing.T) {
 | 
						|
	m := newTaskManager()
 | 
						|
	defer m.ClearTask()
 | 
						|
	tk := NewTask("taskb", "0/30 * * * * *", func(ctx context.Context) error {
 | 
						|
		fmt.Println("hello world")
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
	err := tk.Run(nil)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
	m.AddTask("taskb", tk)
 | 
						|
	m.StartTask()
 | 
						|
	go func() {
 | 
						|
		m.DeleteTask("taskb")
 | 
						|
	}()
 | 
						|
	go func() {
 | 
						|
		m.AddTask("taskb1", tk)
 | 
						|
	}()
 | 
						|
 | 
						|
	time.Sleep(3 * time.Second)
 | 
						|
	m.StopTask()
 | 
						|
}
 | 
						|
 | 
						|
func TestSpec(t *testing.T) {
 | 
						|
	m := newTaskManager()
 | 
						|
	defer m.ClearTask()
 | 
						|
	wg := &sync.WaitGroup{}
 | 
						|
	wg.Add(2)
 | 
						|
	tk1 := NewTask("tk1", "0 12 * * * *", func(ctx context.Context) error { fmt.Println("tk1"); return nil })
 | 
						|
	tk2 := NewTask("tk2", "0,10,20 * * * * *", func(ctx context.Context) error { fmt.Println("tk2"); wg.Done(); return nil })
 | 
						|
	tk3 := NewTask("tk3", "0 10 * * * *", func(ctx context.Context) error { fmt.Println("tk3"); wg.Done(); return nil })
 | 
						|
 | 
						|
	m.AddTask("tk1", tk1)
 | 
						|
	m.AddTask("tk2", tk2)
 | 
						|
	m.AddTask("tk3", tk3)
 | 
						|
	m.StartTask()
 | 
						|
	defer m.StopTask()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(200 * time.Second):
 | 
						|
		t.FailNow()
 | 
						|
	case <-wait(wg):
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestTimeout(t *testing.T) {
 | 
						|
	m := newTaskManager()
 | 
						|
	defer m.ClearTask()
 | 
						|
	wg := &sync.WaitGroup{}
 | 
						|
	wg.Add(2)
 | 
						|
	once1, once2 := sync.Once{}, sync.Once{}
 | 
						|
 | 
						|
	tk1 := NewTask("tk1", "0/10 * * ? * *",
 | 
						|
		func(ctx context.Context) error {
 | 
						|
			time.Sleep(4 * time.Second)
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				once1.Do(func() {
 | 
						|
					fmt.Println("tk1 done")
 | 
						|
					wg.Done()
 | 
						|
				})
 | 
						|
				return errors.New("timeout")
 | 
						|
			default:
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		}, TimeoutOption(3*time.Second),
 | 
						|
	)
 | 
						|
 | 
						|
	tk2 := NewTask("tk2", "0/11 * * ? * *",
 | 
						|
		func(ctx context.Context) error {
 | 
						|
			time.Sleep(4 * time.Second)
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return errors.New("timeout")
 | 
						|
			default:
 | 
						|
				once2.Do(func() {
 | 
						|
					fmt.Println("tk2 done")
 | 
						|
					wg.Done()
 | 
						|
				})
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		},
 | 
						|
	)
 | 
						|
 | 
						|
	m.AddTask("tk1", tk1)
 | 
						|
	m.AddTask("tk2", tk2)
 | 
						|
	m.StartTask()
 | 
						|
	defer m.StopTask()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-time.After(19 * time.Second):
 | 
						|
		t.Error("TestTimeout failed")
 | 
						|
	case <-wait(wg):
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func TestTask_Run(t *testing.T) {
 | 
						|
	cnt := -1
 | 
						|
	task := func(ctx context.Context) error {
 | 
						|
		cnt++
 | 
						|
		fmt.Printf("Hello, world! %d \n", cnt)
 | 
						|
		return errors.New(fmt.Sprintf("Hello, world! %d", cnt))
 | 
						|
	}
 | 
						|
	tk := NewTask("taska", "0/30 * * * * *", task)
 | 
						|
	for i := 0; i < 200; i++ {
 | 
						|
		e := tk.Run(nil)
 | 
						|
		assert.NotNil(t, e)
 | 
						|
	}
 | 
						|
 | 
						|
	l := tk.Errlist
 | 
						|
	assert.Equal(t, 100, len(l))
 | 
						|
	assert.Equal(t, "Hello, world! 100", l[0].errinfo)
 | 
						|
	assert.Equal(t, "Hello, world! 101", l[1].errinfo)
 | 
						|
}
 | 
						|
 | 
						|
func TestCrudTask(t *testing.T) {
 | 
						|
	m := newTaskManager()
 | 
						|
	m.AddTask("my-task1", NewTask("my-task1", "0/30 * * * * *", func(ctx context.Context) error {
 | 
						|
		return nil
 | 
						|
	}))
 | 
						|
 | 
						|
	m.AddTask("my-task2", NewTask("my-task2", "0/30 * * * * *", func(ctx context.Context) error {
 | 
						|
		return nil
 | 
						|
	}))
 | 
						|
 | 
						|
	m.DeleteTask("my-task1")
 | 
						|
	assert.Equal(t, 1, len(m.adminTaskList))
 | 
						|
 | 
						|
	m.ClearTask()
 | 
						|
	assert.Equal(t, 0, len(m.adminTaskList))
 | 
						|
}
 | 
						|
 | 
						|
func wait(wg *sync.WaitGroup) chan bool {
 | 
						|
	ch := make(chan bool)
 | 
						|
	go func() {
 | 
						|
		wg.Wait()
 | 
						|
		ch <- true
 | 
						|
	}()
 | 
						|
	return ch
 | 
						|
}
 |