beego/task/task.go
Ming Deng 0bd2df91a1
Resolve conflicts among master branch and develop branch (#5286)
* feature extend readthrough for cache module (#5116)

* feature 增加readthrough

* feature: add write though for cache mode (#5117)

* feature: add writethough for cache mode

* feature add singleflight cache (#5119)

* build(deps): bump go.opentelemetry.io/otel/trace from 1.8.0 to 1.11.2

Bumps [go.opentelemetry.io/otel/trace](https://github.com/open-telemetry/opentelemetry-go) from 1.8.0 to 1.11.2.
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.8.0...v1.11.2)

---
updated-dependencies:
- dependency-name: go.opentelemetry.io/otel/trace
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix 5129: must set formatter after init the logger

* remove beego.vip

* build(deps): bump actions/stale from 5 to 7

Bumps [actions/stale](https://github.com/actions/stale) from 5 to 7.
- [Release notes](https://github.com/actions/stale/releases)
- [Changelog](https://github.com/actions/stale/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/stale/compare/v5...v7)

---
updated-dependencies:
- dependency-name: actions/stale
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix 5079: only log msg when the channel is not closed (#5132)

* optimize test

* upgrade otel dependencies to v1.11.2

* format code

* Bloom filter cache (#5126)

* feature: add bloom filter cache

* feature upload remove all temp file

* bugfix Controller SaveToFile remove all temp file

* rft: motify BeeLogger signalChan (#5139)

* add non-block write log in asynchronous mode (#5150)

* add non-block write log in asynchronous mode

---------

Co-authored-by: chenhaokun <chenhaokun@itiger.com>

* fix the docsite URL (#5173)

* Unified gopkg.in/yaml version to v2 (#5169)

* Unified gopkg.in/yaml version to v2 and go mod tidy

* update CHANGELOG

* bugfix: protect field access with lock to avoid possible data race (#5211)

* fix some comments (#5194)

Signed-off-by: cui fliter <imcusg@gmail.com>

* build(deps): bump github.com/prometheus/client_golang (#5213)

Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.14.0 to 1.15.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.14.0...v1.15.1)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): bump go.etcd.io/etcd/client/v3 from 3.5.4 to 3.5.9 (#5209)

Bumps [go.etcd.io/etcd/client/v3](https://github.com/etcd-io/etcd) from 3.5.4 to 3.5.9.
- [Release notes](https://github.com/etcd-io/etcd/releases)
- [Commits](https://github.com/etcd-io/etcd/compare/v3.5.4...v3.5.9)

---
updated-dependencies:
- dependency-name: go.etcd.io/etcd/client/v3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* cache: fix typo and optimize the naming

* Release 2.1.0 change log

* bugfix: beegoAppConfig String and Strings function has bug

* httplib: fix unstable test, do not use httplib.org

* chore: pkg imported more than once

* chore: fmt modify

* chore: Use github.com/go-kit/log

* chore: unnecessary use of fmt.Sprintf

* fix: golangci-lint error

* orm: refactor ORM introducing internal/models pkg

* remove adapter package

* build(deps): bump github.com/bits-and-blooms/bloom/v3

Bumps [github.com/bits-and-blooms/bloom/v3](https://github.com/bits-and-blooms/bloom) from 3.3.1 to 3.5.0.
- [Release notes](https://github.com/bits-and-blooms/bloom/releases)
- [Commits](https://github.com/bits-and-blooms/bloom/compare/v3.3.1...v3.5.0)

---
updated-dependencies:
- dependency-name: github.com/bits-and-blooms/bloom/v3
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* feat: add write-delete cache mode

* fix: unnecessary assignment to the blank identifier

* fix: add change into .CHANGELOG file

* build(deps): bump golang.org/x/sync from 0.1.0 to 0.3.0

Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.1.0 to 0.3.0.
- [Commits](https://github.com/golang/sync/compare/v0.1.0...v0.3.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sync
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* build(deps): bump golang.org/x/crypto

Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.0.0-20220315160706-3147a52a75dd to 0.10.0.
- [Commits](https://github.com/golang/crypto/commits/v0.10.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* remove golang--lint-ci

* Beego web.Run() runs the server twice

* fix 5255: Check the rows.Err() if rows.Next() is false

* closes 5254: %COL% should be a common placeholder

* build(deps): bump github.com/prometheus/client_golang

Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.15.1 to 1.16.0.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.15.1...v1.16.0)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix: use of ioutil package (#5261)

* fix ioutil.NopCloser

* fix ioutil.ReadAll

* fix ioutil.ReadFile

* fix ioutil.WriteFile

* run goimports -w -format-only ./

* update CHANGELOG.md

* feature: add write-double-delete cache mode (#5263)

* cache/redis: support skipEmptyPrefix option (#5264)

* fix: refactor InsertValue method (#5267)

* fix: refactor insertValue method and add the test

* fix: exec goimports and add Licence file header

* fix: modify construct method of dbBase

* fix: add modify record into CHANGELOG

* fix: modify InsertOrUpdate method (#5269)

* fix: modify InsertOrUpdate method, Remove the isMulti variable and its associated code

* fix: Delete unnecessary judgment branches

* fix: add modify record into CHANGELOG

* cache/redis: use redisConfig to receive incoming JSON (previously using a map) (#5268)

* refactor cache/redis: Use redisConfig to receive incoming JSON (previously using a map).

* refactor cache/redis: Use the string type to receive JSON parameters.

---------

Co-authored-by: Tan <tanqianheng@gmail.com>

* fix: refactor Delete method (#5271)

* fix: refactor Delete method and add test

* fix: add modify record into CHANGELOG

* fix: refactor update sql (#5274)

* fix: refactor UpdateSQL method and add test

* fix: add modify record into CHANGELOG

* fix: modify url in the CHANGELOG

* fix: modify pr url in the CHANGELOG

* Fix setPK function for table without primary key (#5276)

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: cui fliter <imcusg@gmail.com>
Co-authored-by: Stone-afk <73482944+Stone-afk@users.noreply.github.com>
Co-authored-by: hookokoko <hooko@tju.edu.cn>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: hookokoko <648646891@qq.com>
Co-authored-by: Stone-afk <1711865140@qq.com>
Co-authored-by: chenhaokun <chenhaokun@itiger.com>
Co-authored-by: Xuing <admin@xuing.cn>
Co-authored-by: cui fliter <imcusg@gmail.com>
Co-authored-by: guoguangwu <guoguangwu@magic-shield.com>
Co-authored-by: uzziah <uzziahlin@gmail.com>
Co-authored-by: Hanjiang Yu <delacroix.yu@gmail.com>
Co-authored-by: Kota <mdryzk64smsh@gmail.com>
Co-authored-by: Uzziah <120019273+uzziahlin@users.noreply.github.com>
Co-authored-by: Handkerchiefs-t <59816423+Handkerchiefs-t@users.noreply.github.com>
Co-authored-by: Tan <tanqianheng@gmail.com>
Co-authored-by: mlgd <mlgd17@gmail.com>
2023-07-31 23:00:02 +08:00

833 lines
19 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2014 beego Author. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"context"
"log"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
)
// bounds provides a range of acceptable values (plus a map of name to value).
type bounds struct {
min, max uint
names map[string]uint
}
type taskManager struct {
adminTaskList map[string]Tasker
taskLock sync.RWMutex
stop chan bool
changed chan bool
started bool
wait sync.WaitGroup
}
func newTaskManager() *taskManager {
return &taskManager{
adminTaskList: make(map[string]Tasker),
taskLock: sync.RWMutex{},
stop: make(chan bool),
changed: make(chan bool),
started: false,
}
}
// The bounds for each field.
var (
globalTaskManager *taskManager
seconds = bounds{0, 59, nil}
minutes = bounds{0, 59, nil}
hours = bounds{0, 23, nil}
days = bounds{1, 31, nil}
months = bounds{1, 12, map[string]uint{
"jan": 1,
"feb": 2,
"mar": 3,
"apr": 4,
"may": 5,
"jun": 6,
"jul": 7,
"aug": 8,
"sep": 9,
"oct": 10,
"nov": 11,
"dec": 12,
}}
weeks = bounds{0, 6, map[string]uint{
"sun": 0,
"mon": 1,
"tue": 2,
"wed": 3,
"thu": 4,
"fri": 5,
"sat": 6,
}}
)
const (
// Set the top bit if a star was included in the expression.
starBit = 1 << 63
)
// Schedule time taks schedule
type Schedule struct {
Second uint64
Minute uint64
Hour uint64
Day uint64
Month uint64
Week uint64
}
// TaskFunc task func type
type TaskFunc func(ctx context.Context) error
// Tasker task interface
type Tasker interface {
GetSpec(ctx context.Context) string
GetStatus(ctx context.Context) string
Run(ctx context.Context) error
SetNext(context.Context, time.Time)
GetNext(ctx context.Context) time.Time
SetPrev(context.Context, time.Time)
GetPrev(ctx context.Context) time.Time
GetTimeout(ctx context.Context) time.Duration
}
// task error
type taskerr struct {
t time.Time
errinfo string
}
// Task task struct
// It's not a thread-safe structure.
// Only nearest errors will be saved in ErrList
type Task struct {
Taskname string
Spec *Schedule
SpecStr string
DoFunc TaskFunc
Prev time.Time
Next time.Time
Timeout time.Duration // timeout duration
Errlist []*taskerr // like errtime:errinfo
ErrLimit int // max length for the errlist, 0 stand for no limit
errCnt int // records the error count during the execution
}
// NewTask add new task with name, time and func
func NewTask(tname string, spec string, f TaskFunc, opts ...Option) *Task {
task := &Task{
Taskname: tname,
DoFunc: f,
// Make configurable
ErrLimit: 100,
SpecStr: spec,
// we only store the pointer, so it won't use too many space
Errlist: make([]*taskerr, 100, 100),
}
for _, opt := range opts {
opt.apply(task)
}
task.SetCron(spec)
return task
}
// GetSpec get spec string
func (t *Task) GetSpec(context.Context) string {
return t.SpecStr
}
// GetStatus get current task status
func (t *Task) GetStatus(context.Context) string {
var str string
for _, v := range t.Errlist {
if v == nil {
continue
}
str += v.t.String() + ":" + v.errinfo + "<br>"
}
return str
}
// Run run all tasks
func (t *Task) Run(ctx context.Context) error {
err := t.DoFunc(ctx)
if err != nil {
index := t.errCnt % t.ErrLimit
t.Errlist[index] = &taskerr{t: t.Next, errinfo: err.Error()}
t.errCnt++
}
return err
}
// SetNext set next time for this task
func (t *Task) SetNext(ctx context.Context, now time.Time) {
t.Next = t.Spec.Next(now)
}
// GetNext get the next call time of this task
func (t *Task) GetNext(context.Context) time.Time {
return t.Next
}
// SetPrev set prev time of this task
func (t *Task) SetPrev(ctx context.Context, now time.Time) {
t.Prev = now
}
// GetPrev get prev time of this task
func (t *Task) GetPrev(context.Context) time.Time {
return t.Prev
}
// GetTimeout get timeout duration of this task
func (t *Task) GetTimeout(context.Context) time.Duration {
return t.Timeout
}
// Option interface
type Option interface {
apply(*Task)
}
// optionFunc return a function to set task element
type optionFunc func(*Task)
// apply option to task
func (f optionFunc) apply(t *Task) {
f(t)
}
// TimeoutOption return an option to set timeout duration for task
func TimeoutOption(timeout time.Duration) Option {
return optionFunc(func(t *Task) {
t.Timeout = timeout
})
}
// six columns mean
// second0-59
// minute0-59
// hour1-23
// day1-31
// month1-12
// week0-60 means Sunday
// SetCron some signals
//
// * any time
// ,  separate signal
//
//    duration
//
// /n : do as n times of time duration
//
// ///////////////////////////////////////////////////////
//
// 0/30 * * * * * every 30s
// 0 43 21 * * * 21:43
// 0 15 05 * * *    05:15
// 0 0 17 * * * 17:00
// 0 0 17 * * 1 17:00 in every Monday
// 0 0,10 17 * * 0,2,3 17:00 and 17:10 in every Sunday, Tuesday and Wednesday
// 0 0-10 17 1 * * 17:00 to 17:10 in 1 min duration each time on the first day of month
// 0 0 0 1,15 * 1 0:00 on the 1st day and 15th day of month
// 0 42 4 1 * *     4:42 on the 1st day of month
// 0 0 21 * * 1-6   21:00 from Monday to Saturday
// 0 0,10,20,30,40,50 * * * *  every 10 min duration
// 0 */10 * * * *        every 10 min duration
// 0 * 1 * * *         1:00 to 1:59 in 1 min duration each time
// 0 0 1 * * *         1:00
// 0 0 */1 * * *        0 min of hour in 1 hour duration
// 0 0 * * * *         0 min of hour in 1 hour duration
// 0 2 8-20/3 * * *       8:02, 11:02, 14:02, 17:02, 20:02
// 0 30 5 1,15 * *       5:30 on the 1st day and 15th day of month
func (t *Task) SetCron(spec string) {
t.Spec = t.parse(spec)
}
func (t *Task) parse(spec string) *Schedule {
if len(spec) > 0 && spec[0] == '@' {
return t.parseSpec(spec)
}
// Split on whitespace. We require 5 or 6 fields.
// (second) (minute) (hour) (day of month) (month) (day of week, optional)
fields := strings.Fields(spec)
if len(fields) != 5 && len(fields) != 6 {
log.Panicf("Expected 5 or 6 fields, found %d: %s", len(fields), spec)
}
// If a sixth field is not provided (DayOfWeek), then it is equivalent to star.
if len(fields) == 5 {
fields = append(fields, "*")
}
schedule := &Schedule{
Second: getField(fields[0], seconds),
Minute: getField(fields[1], minutes),
Hour: getField(fields[2], hours),
Day: getField(fields[3], days),
Month: getField(fields[4], months),
Week: getField(fields[5], weeks),
}
return schedule
}
func (t *Task) parseSpec(spec string) *Schedule {
switch spec {
case "@yearly", "@annually":
return &Schedule{
Second: 1 << seconds.min,
Minute: 1 << minutes.min,
Hour: 1 << hours.min,
Day: 1 << days.min,
Month: 1 << months.min,
Week: all(weeks),
}
case "@monthly":
return &Schedule{
Second: 1 << seconds.min,
Minute: 1 << minutes.min,
Hour: 1 << hours.min,
Day: 1 << days.min,
Month: all(months),
Week: all(weeks),
}
case "@weekly":
return &Schedule{
Second: 1 << seconds.min,
Minute: 1 << minutes.min,
Hour: 1 << hours.min,
Day: all(days),
Month: all(months),
Week: 1 << weeks.min,
}
case "@daily", "@midnight":
return &Schedule{
Second: 1 << seconds.min,
Minute: 1 << minutes.min,
Hour: 1 << hours.min,
Day: all(days),
Month: all(months),
Week: all(weeks),
}
case "@hourly":
return &Schedule{
Second: 1 << seconds.min,
Minute: 1 << minutes.min,
Hour: all(hours),
Day: all(days),
Month: all(months),
Week: all(weeks),
}
}
log.Panicf("Unrecognized descriptor: %s", spec)
return nil
}
// Next set schedule to next time
func (s *Schedule) Next(t time.Time) time.Time {
// Start at the earliest possible time (the upcoming second).
t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)
// This flag indicates whether a field has been incremented.
added := false
// If no time is found within five years, return zero.
yearLimit := t.Year() + 5
WRAP:
if t.Year() > yearLimit {
return time.Time{}
}
// Find the first applicable month.
// If it's this month, then do nothing.
for 1<<uint(t.Month())&s.Month == 0 {
// If we have to add a month, reset the other parts to 0.
if !added {
added = true
// Otherwise, set the date at the beginning (since the current time is irrelevant).
t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location())
}
t = t.AddDate(0, 1, 0)
// Wrapped around.
if t.Month() == time.January {
goto WRAP
}
}
// Now get a day in that month.
for !dayMatches(s, t) {
if !added {
added = true
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
t = t.AddDate(0, 0, 1)
if t.Day() == 1 {
goto WRAP
}
}
for 1<<uint(t.Hour())&s.Hour == 0 {
if !added {
added = true
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
}
t = t.Add(1 * time.Hour)
if t.Hour() == 0 {
goto WRAP
}
}
for 1<<uint(t.Minute())&s.Minute == 0 {
if !added {
added = true
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
}
t = t.Add(1 * time.Minute)
if t.Minute() == 0 {
goto WRAP
}
}
for 1<<uint(t.Second())&s.Second == 0 {
if !added {
added = true
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), 0, t.Location())
}
t = t.Add(1 * time.Second)
if t.Second() == 0 {
goto WRAP
}
}
return t
}
func dayMatches(s *Schedule, t time.Time) bool {
var (
domMatch = 1<<uint(t.Day())&s.Day > 0
dowMatch = 1<<uint(t.Weekday())&s.Week > 0
)
if s.Day&starBit > 0 || s.Week&starBit > 0 {
return domMatch && dowMatch
}
return domMatch || dowMatch
}
// StartTask start all tasks
func StartTask() {
globalTaskManager.StartTask()
}
// StopTask stop all tasks
func StopTask() {
globalTaskManager.StopTask()
}
// AddTask add task with name
func AddTask(taskName string, t Tasker) {
globalTaskManager.AddTask(taskName, t)
}
// DeleteTask delete task with name
func DeleteTask(taskName string) {
globalTaskManager.DeleteTask(taskName)
}
// ClearTask clear all tasks
func ClearTask() {
globalTaskManager.ClearTask()
}
// GetAllTasks get all tasks
func GetAllTasks() []Tasker {
return globalTaskManager.GetAllTasks()
}
// GracefulShutdown wait all task done
func GracefulShutdown() <-chan struct{} {
return globalTaskManager.GracefulShutdown()
}
// StartTask start all tasks
func (m *taskManager) StartTask() {
m.taskLock.Lock()
defer m.taskLock.Unlock()
if m.started {
// If already started no need to start another goroutine.
return
}
m.started = true
registerCommands()
go m.run()
}
func (m *taskManager) run() {
now := time.Now().Local()
// first run the tasks, so set all tasks next run time.
m.setTasksStartTime(now)
for {
// we only use RLock here because NewMapSorter copy the reference, do not change any thing
// here, we sort all task and get first task running time (effective).
m.taskLock.RLock()
sortList := NewMapSorter(m.adminTaskList)
m.taskLock.RUnlock()
sortList.Sort()
var effective time.Time
if len(m.adminTaskList) == 0 || sortList.Vals[0].GetNext(context.Background()).IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
effective = now.AddDate(10, 0, 0)
} else {
effective = sortList.Vals[0].GetNext(context.Background())
}
select {
case now = <-time.After(effective.Sub(now)): // wait for effective time
m.runNextTasks(sortList, effective)
continue
case <-m.changed: // tasks have been changed, set all tasks run again now
now = time.Now().Local()
m.setTasksStartTime(now)
continue
case <-m.stop: // manager is stopped, and mark manager is stopped
m.markManagerStop()
return
}
}
}
// setTasksStartTime is set all tasks next running time
func (m *taskManager) setTasksStartTime(now time.Time) {
m.taskLock.Lock()
for _, task := range m.adminTaskList {
task.SetNext(context.Background(), now)
}
m.taskLock.Unlock()
}
// markManagerStop it sets manager to be stopped
func (m *taskManager) markManagerStop() {
m.taskLock.Lock()
if m.started {
m.started = false
}
m.taskLock.Unlock()
}
// runNextTasks it runs next task which next run time is equal to effective
func (m *taskManager) runNextTasks(sortList *MapSorter, effective time.Time) {
// Run every entry whose next time was this effective time.
i := 0
for _, e := range sortList.Vals {
i++
if e.GetNext(context.Background()) != effective {
break
}
// check if timeout is on, if yes passing the timeout context
ctx := context.Background()
m.wait.Add(1)
if duration := e.GetTimeout(ctx); duration != 0 {
go func(e Tasker) {
defer m.wait.Done()
ctx, cancelFunc := context.WithTimeout(ctx, duration)
defer cancelFunc()
err := e.Run(ctx)
if err != nil {
log.Printf("tasker.run err: %s\n", err.Error())
}
}(e)
} else {
go func(e Tasker) {
defer m.wait.Done()
err := e.Run(ctx)
if err != nil {
log.Printf("tasker.run err: %s\n", err.Error())
}
}(e)
}
e.SetPrev(context.Background(), e.GetNext(context.Background()))
e.SetNext(context.Background(), effective)
}
}
// StopTask stop all tasks
func (m *taskManager) StopTask() {
go func() {
m.stop <- true
}()
}
// GracefulShutdown wait all task done
func (m *taskManager) GracefulShutdown() <-chan struct{} {
done := make(chan struct{})
go func() {
m.stop <- true
m.wait.Wait()
close(done)
}()
return done
}
// AddTask add task with name
func (m *taskManager) AddTask(taskname string, t Tasker) {
isChanged := false
m.taskLock.Lock()
t.SetNext(nil, time.Now().Local())
m.adminTaskList[taskname] = t
if m.started {
isChanged = true
}
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
}
}
// DeleteTask delete task with name
func (m *taskManager) DeleteTask(taskname string) {
isChanged := false
m.taskLock.Lock()
delete(m.adminTaskList, taskname)
if m.started {
isChanged = true
}
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
}
}
// ClearTask clear all tasks
func (m *taskManager) ClearTask() {
isChanged := false
m.taskLock.Lock()
m.adminTaskList = make(map[string]Tasker)
if m.started {
isChanged = true
}
m.taskLock.Unlock()
if isChanged {
go func() {
m.changed <- true
}()
}
}
// GetAllTasks get all tasks
func (m *taskManager) GetAllTasks() []Tasker {
m.taskLock.RLock()
l := make([]Tasker, 0, len(m.adminTaskList))
for _, t := range m.adminTaskList {
l = append(l, t)
}
m.taskLock.RUnlock()
return l
}
// MapSorter sort map for tasker
type MapSorter struct {
Keys []string
Vals []Tasker
}
// NewMapSorter create new tasker map
func NewMapSorter(m map[string]Tasker) *MapSorter {
ms := &MapSorter{
Keys: make([]string, 0, len(m)),
Vals: make([]Tasker, 0, len(m)),
}
for k, v := range m {
ms.Keys = append(ms.Keys, k)
ms.Vals = append(ms.Vals, v)
}
return ms
}
// Sort sort tasker map
func (ms *MapSorter) Sort() {
sort.Sort(ms)
}
func (ms *MapSorter) Len() int { return len(ms.Keys) }
func (ms *MapSorter) Less(i, j int) bool {
if ms.Vals[i].GetNext(context.Background()).IsZero() {
return false
}
if ms.Vals[j].GetNext(context.Background()).IsZero() {
return true
}
return ms.Vals[i].GetNext(context.Background()).Before(ms.Vals[j].GetNext(context.Background()))
}
func (ms *MapSorter) Swap(i, j int) {
ms.Vals[i], ms.Vals[j] = ms.Vals[j], ms.Vals[i]
ms.Keys[i], ms.Keys[j] = ms.Keys[j], ms.Keys[i]
}
func getField(field string, r bounds) uint64 {
// list = range {"," range}
var bits uint64
ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
for _, expr := range ranges {
bits |= getRange(expr, r)
}
return bits
}
// getRange returns the bits indicated by the given expression:
//
// number | number "-" number [ "/" number ]
func getRange(expr string, r bounds) uint64 {
var (
start, end, step uint
rangeAndStep = strings.Split(expr, "/")
lowAndHigh = strings.Split(rangeAndStep[0], "-")
singleDigit = len(lowAndHigh) == 1
)
var extrastar uint64
if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" {
start = r.min
end = r.max
extrastar = starBit
} else {
start = parseIntOrName(lowAndHigh[0], r.names)
switch len(lowAndHigh) {
case 1:
end = start
case 2:
end = parseIntOrName(lowAndHigh[1], r.names)
default:
log.Panicf("Too many hyphens: %s", expr)
}
}
switch len(rangeAndStep) {
case 1:
step = 1
case 2:
step = mustParseInt(rangeAndStep[1])
// Special handling: "N/step" means "N-max/step".
if singleDigit {
end = r.max
}
default:
log.Panicf("Too many slashes: %s", expr)
}
if start < r.min {
log.Panicf("Beginning of range (%d) below minimum (%d): %s", start, r.min, expr)
}
if end > r.max {
log.Panicf("End of range (%d) above maximum (%d): %s", end, r.max, expr)
}
if start > end {
log.Panicf("Beginning of range (%d) beyond end of range (%d): %s", start, end, expr)
}
return getBits(start, end, step) | extrastar
}
// parseIntOrName returns the (possibly-named) integer contained in expr.
func parseIntOrName(expr string, names map[string]uint) uint {
if names != nil {
if namedInt, ok := names[strings.ToLower(expr)]; ok {
return namedInt
}
}
return mustParseInt(expr)
}
// mustParseInt parses the given expression as an int or panics.
func mustParseInt(expr string) uint {
num, err := strconv.Atoi(expr)
if err != nil {
log.Panicf("Failed to parse int from %s: %s", expr, err)
}
if num < 0 {
log.Panicf("Negative number (%d) not allowed: %s", num, expr)
}
return uint(num)
}
// getBits sets all bits in the range [min, max], modulo the given step size.
func getBits(min, max, step uint) uint64 {
var bits uint64
// If step is 1, use shifts.
if step == 1 {
return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)
}
// Else, use a simple loop.
for i := min; i <= max; i += step {
bits |= 1 << i
}
return bits
}
// all returns all bits within the given bounds. (plus the star bit)
func all(r bounds) uint64 {
return getBits(r.min, r.max, 1) | starBit
}
func init() {
globalTaskManager = newTaskManager()
}