Documentation
¶
Overview ¶
A stateful, recovable and map-reduce supported dag tasks executor
Index ¶
- Variables
- func AppendError(err error, tail ...error) error
- func Cast[T any](src any) (T, error)
- func CastTo[T any](src any, dst T) error
- func GetTaskSpecsName(specs []TaskSpec) []string
- func IsRuntimeError(err error) bool
- func Min[T number](a T, xs ...T) T
- func NewRuntimeError(err error) error
- func NewRuntimeErrorf(format string, args ...any) error
- func Remove[T comparable](xs []T, t T) []T
- func RemoveAt[T any](xs []T, is ...int) []T
- func Repeat[T any](s T, n int) []T
- func Sum[T number](xs []T) T
- func ToJSON(obj any) string
- func ToLogField(obj any) any
- func ToPrettyJSON(obj any) string
- type Backoff
- type Consumer
- type Executor
- type FailCounter
- type MapStateful
- type Option
- func WithBackoff(fn Backoff) Option
- func WithCallback(cb TaskCallback) Option
- func WithCleanup(cleanup bool) Option
- func WithExpiration(expiration time.Duration) Option
- func WithRetryCallback(cb RetryCallback) Option
- func WithRetryCount(retry int) Option
- func WithRunOnce(b bool) Option
- func WithSkipPrev(skip bool) Option
- type ParallelStateful
- type PipeStateful
- type Postman
- type RetryCallback
- type Stateful
- func DumpAndLoadTest(s Stateful) (Stateful, error)
- func EmptyTask(sin Stateful) (Stateful, error)
- func Run(name string, fn TaskFunc, args Stateful, opts ...Option) (Stateful, error)
- func RunSpec(spec TaskSpec, opts ...Option) (Stateful, error)
- func RunSpecList(specs []TaskSpec, opts ...Option) (Stateful, error)
- func RunTest(fn TaskFunc, ins Stateful) (outs Stateful, errCount int, rerr error)
- type TaskCallback
- type TaskFunc
- func Map[Input Stateful, Output Stateful](task TaskFuncG[Input, Output]) TaskFunc
- func Parallel(tasks ...any) TaskFunc
- func Pipe(tasks ...any) TaskFunc
- func WrapRemoveError(fn TaskFunc) TaskFunc
- func WrapTaskFunc(fn any) TaskFunc
- func WrapTaskG[Input Stateful, Output Stateful](fn TaskFuncG[Input, Output]) TaskFunc
- func WrapWithCallback(fn TaskFunc, cb func(sout Stateful, err error)) TaskFunc
- type TaskFuncG
- type TaskSpec
- type TaskState
Constants ¶
This section is empty.
Variables ¶
var DefaultBackoff = ExponentialBackoff(1.5, 30*time.Second)
DefaultBackoff is an exponential backoff strategy with a factor of 1.5 and a maximum interval of 30 seconds
Functions ¶
func AppendError ¶
func GetTaskSpecsName ¶
func IsRuntimeError ¶
func NewRuntimeError ¶
func NewRuntimeErrorf ¶
func Remove ¶
func Remove[T comparable](xs []T, t T) []T
Remove removes the first occurrence of the given value from the slice.
func ToLogField ¶
func ToPrettyJSON ¶
ToJSON marshal an object to JSON string, use fmt when marshal failed
Types ¶
type Backoff ¶
Backoff is a function type for calculating retry durations It takes the number of attempts as input and returns the duration to wait before the next attempt
func ExponentialBackoff ¶
ExponentialBackoff increases retry duration exponentially
func FixedIntervalBackoff ¶
FixedIntervalBackoff returns a fixed retry duration, primarily used for testing The interval should be a non-negative duration
type Executor ¶
type Executor interface {
// Submit adds a new task to the executor's queue.
// 1. This method returns immediately.
// 2. Multiple submissions are executed concurrently.
// 3. WithCleanup waits for prior tasks to complete without affecting subsequent submissions.
Submit(name string, fn TaskFunc, args Stateful, opts ...Option)
// SubmitSpec is similar to Submit but takes a TaskSpec.
SubmitSpec(spec TaskSpec, opts ...Option)
// Cleanup ensures all tasks are executed at least once, no retries, and blocks until all tasks are completed.
Cleanup()
// Close performs cleanup and waits for all tasks to finish.
Close()
}
Executor is an interface that defines methods for submitting and managing tasks.
func NewExecutor ¶
type FailCounter ¶
type MapStateful ¶
type MapStateful[Input Stateful, Output Stateful] struct { Fail map[string]any `json:"__runtime_fail"` SucceedList []Output `json:"__runtime_list,omitempty"` SucceedMap map[string]Output `json:"__runtime_map,omitempty"` // contains filtered or unexported fields }
func (*MapStateful[Input, Output]) SetOutput ¶
func (ctx *MapStateful[Input, Output]) SetOutput(k string, v Output) error
func (*MapStateful[Input, Output]) Succeed ¶
func (ctx *MapStateful[Input, Output]) Succeed() (Stateful, error)
type Option ¶
type Option func(ro *runOption)
func WithBackoff ¶
func WithCallback ¶
func WithCallback(cb TaskCallback) Option
WithCallback adds a TaskCallback to the runOption's callback list. If the provided callback is nil, it will not be added.
func WithCleanup ¶
WithCleanup sets the cleanup option to the provided boolean value. When cleanup is true, the task will be marked as FastFail and will wait for all tasks to complete.
func WithExpiration ¶
func WithRetryCallback ¶
func WithRetryCallback(cb RetryCallback) Option
func WithRetryCount ¶
WithRetryCount sets the maximum number of retries for a task. The retry parameter specifies the maximum number of retry attempts.
func WithRunOnce ¶
WithRunOnce sets the runOnce option to the provided boolean value. When runOnce is true, the task will be attempted once before creating a new goroutine. This is recommended when all tasks are serial or very lightweight.
func WithSkipPrev ¶
WithSkipPrev sets the skipPrev option to the provided boolean value. When skipPrev is true, the task will skip previously blocked tasks.
type ParallelStateful ¶
type PipeStateful ¶
type Postman ¶
type Postman[T any] interface { Post(t T) Close() }
Postman is a single-threaded task processor that combines blocked tasks.
func NewPostman ¶
type Stateful ¶
type Stateful = any
Stateful values must could be deserialized by json.Unmarshal
func DumpAndLoadTest ¶
DumpAndLoadTest simulates the dump and load process.
type TaskCallback ¶
TaskCallback is a function type that is called when a task completes. It receives the task state and an error if one occurred.
type TaskFunc ¶
func WrapRemoveError ¶
func WrapTaskFunc ¶
type TaskSpec ¶
type TaskSpec struct {
// Name is the name of the task.
Name string
// Exec is the function to be executed for the task.
Exec TaskFunc
// Args are the arguments to be passed to the task function.
Args Stateful
// Opts are the options for configuring the task execution.
Opts []Option
}
TaskSpec defines the specification for a task.
type TaskState ¶
type TaskState struct {
// Name is the name of the task.
Name string `json:"name"`
// Finished indicates whether the task has completed.
Finished bool `json:"finished"`
// Success indicates whether the task was successful.
Success bool `json:"success"`
// ExecutedCount is the number of times the task has been executed.
ExecutedCount int `json:"executed_count"`
// Stateful is the internal state of the task.
Stateful Stateful `json:"stateful"`
// Error contains any error message if the task failed.
Error string `json:"error,omitempty"`
// CreatedAt is the time when the task was created.
CreatedAt time.Time `json:"created_at,omitempty"`
// UpdatedAt is the time when the task was last updated.
UpdatedAt time.Time `json:"updated_at,omitempty"`
}
TaskState represents the state of a task.