executors

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 13 Imported by: 0

README

Executors

Executors

Implement a stateful task executor, Executor, with the following features:

  • Events are stateful and retryable, with the function signature: func(sin any) (sout any, err error).
  • Tasks can be orchestrated using Pipe (sequential), Map (data parallel), and Parallel (compute parallel) methods.
  • For a multi-step task, if a step fails, the next retry will resume from the failed step, and previously successful steps will not be re-executed.
  • Task states can be dumped and loaded, and upon reloading, execution resumes from the last failed step.
  • Tasks automatically retry upon failure, but if new events arrive, the retrying tasks can be canceled. For example, when dumping a snapshot, if a new version of the snapshot is available, only the latest snapshot needs to be written, and previously blocked tasks can be canceled.

Getting started

import (
	"fmt"
	"math/rand"

	"github.com/shinexia/executors-go"
)

task := executors.Pipe(
    // split
    func(sin int) (any, error) {
        if rand.Intn(100) < 50 {
            // must return the original sin if error
            return sin, fmt.Errorf("split error")
        }
        out := make([]int, sin)
        for i := range sin {
            out[i] = i
        }
        return out, nil
    },
    // map
    executors.Map(func(sin int) (any, error) {
        if n := rand.Intn(100); n < 50 {
            // must return the original sin if error
            return sin, fmt.Errorf("map error")
        }
        return sin * 100, nil
    }),
    // reduce
    func(sin []int) (any, error) {
        if n := rand.Intn(100); n < 50 {
            // must return the original sin if error
            return sin, fmt.Errorf("reduce error")
        }
        return executors.Sum(sin), nil
    },
)
var snapshot []byte = nil
var result int
for {
    var sin any = nil
    if len(snapshot) == 0 {
        sin = 10
    } else {
        // load state from snapshot
        state := &executors.TaskState{}
        json.Unmarshal(snapshot, state)
        sin = state.Stateful
    }
    sout, err := executors.Run("test", task, sin, executors.WithCallback(func(state *executors.TaskState, err error) {
        // dump sout to a file or db
        snapshot, _ = json.Marshal(state)
    }))
    if err != nil {
        fmt.Printf("retring, sout: %v, err: %v\n", executors.ToJSON(sout), err)
        if executors.IsRuntimeError(err) {
            t.Fatalf("runtime error: %+v", err)
        }
        continue
    }
    result, _ = executors.Cast[int](sout)
    fmt.Printf("succeed, sout: %v\n", sout)
    break
}
if result != 4500 {
    t.Errorf("result: %v, expect: 4500", result)
}

More examples

See examples_test.go

Documentation

Overview

A stateful, recovable and map-reduce supported dag tasks executor

Index

Constants

This section is empty.

Variables

View Source
var DefaultBackoff = ExponentialBackoff(1.5, 30*time.Second)

DefaultBackoff is an exponential backoff strategy with a factor of 1.5 and a maximum interval of 30 seconds

Functions

func AppendError

func AppendError(err error, tail ...error) error

func Cast

func Cast[T any](src any) (T, error)

Cast converts the input value to the specified type T

func CastTo

func CastTo[T any](src any, dst T) error

CastTo performs type conversion from src to dst

func GetTaskSpecsName

func GetTaskSpecsName(specs []TaskSpec) []string

func IsRuntimeError

func IsRuntimeError(err error) bool

func Min

func Min[T number](a T, xs ...T) T

Min returns the minimum value among the given values.

func NewRuntimeError

func NewRuntimeError(err error) error

func NewRuntimeErrorf

func NewRuntimeErrorf(format string, args ...any) error

func Remove

func Remove[T comparable](xs []T, t T) []T

Remove removes the first occurrence of the given value from the slice.

func RemoveAt

func RemoveAt[T any](xs []T, is ...int) []T

RemoveAt removes elements at the specified indices from the slice.

func Repeat

func Repeat[T any](s T, n int) []T

Repeat returns a slice with the given value repeated n times.

func Sum

func Sum[T number](xs []T) T

Sum returns the sum of the given values.

func ToJSON

func ToJSON(obj any) string

ToJSON marshal an object to JSON string, use fmt when marshal failed

func ToLogField

func ToLogField(obj any) any

func ToPrettyJSON

func ToPrettyJSON(obj any) string

ToJSON marshal an object to JSON string, use fmt when marshal failed

Types

type Backoff

type Backoff func(attempts int) time.Duration

Backoff is a function type for calculating retry durations It takes the number of attempts as input and returns the duration to wait before the next attempt

func ExponentialBackoff

func ExponentialBackoff(factor float64, maxInterval time.Duration) Backoff

ExponentialBackoff increases retry duration exponentially

func FixedIntervalBackoff

func FixedIntervalBackoff(interval time.Duration) Backoff

FixedIntervalBackoff returns a fixed retry duration, primarily used for testing The interval should be a non-negative duration

type Consumer

type Consumer[T any] func(t T)

type Executor

type Executor interface {
	// Submit adds a new task to the executor's queue.
	// 1. This method returns immediately.
	// 2. Multiple submissions are executed concurrently.
	// 3. WithCleanup waits for prior tasks to complete without affecting subsequent submissions.
	Submit(name string, fn TaskFunc, args Stateful, opts ...Option)

	// SubmitSpec is similar to Submit but takes a TaskSpec.
	SubmitSpec(spec TaskSpec, opts ...Option)

	// Cleanup ensures all tasks are executed at least once, no retries, and blocks until all tasks are completed.
	Cleanup()

	// Close performs cleanup and waits for all tasks to finish.
	Close()
}

Executor is an interface that defines methods for submitting and managing tasks.

func NewExecutor

func NewExecutor(name string, opts ...Option) Executor

type FailCounter

type FailCounter[T any] struct {
	Data      T   `json:"data"`
	FailCount int `json:"fail_count"`
}

type MapStateful

type MapStateful[Input Stateful, Output Stateful] struct {
	Fail        map[string]any    `json:"__runtime_fail"`
	SucceedList []Output          `json:"__runtime_list,omitempty"`
	SucceedMap  map[string]Output `json:"__runtime_map,omitempty"`
	// contains filtered or unexported fields
}

func (*MapStateful[Input, Output]) SetOutput

func (ctx *MapStateful[Input, Output]) SetOutput(k string, v Output) error

func (*MapStateful[Input, Output]) Succeed

func (ctx *MapStateful[Input, Output]) Succeed() (Stateful, error)

type Option

type Option func(ro *runOption)

func WithBackoff

func WithBackoff(fn Backoff) Option

func WithCallback

func WithCallback(cb TaskCallback) Option

WithCallback adds a TaskCallback to the runOption's callback list. If the provided callback is nil, it will not be added.

func WithCleanup

func WithCleanup(cleanup bool) Option

WithCleanup sets the cleanup option to the provided boolean value. When cleanup is true, the task will be marked as FastFail and will wait for all tasks to complete.

func WithExpiration

func WithExpiration(expiration time.Duration) Option

func WithRetryCallback

func WithRetryCallback(cb RetryCallback) Option

func WithRetryCount

func WithRetryCount(retry int) Option

WithRetryCount sets the maximum number of retries for a task. The retry parameter specifies the maximum number of retry attempts.

func WithRunOnce

func WithRunOnce(b bool) Option

WithRunOnce sets the runOnce option to the provided boolean value. When runOnce is true, the task will be attempted once before creating a new goroutine. This is recommended when all tasks are serial or very lightweight.

func WithSkipPrev

func WithSkipPrev(skip bool) Option

WithSkipPrev sets the skipPrev option to the provided boolean value. When skipPrev is true, the task will skip previously blocked tasks.

type ParallelStateful

type ParallelStateful struct {
	Succeed []any `json:"__runtime_succeed"`
	Fail    []int `json:"__runtime_fail"`
	// contains filtered or unexported fields
}

type PipeStateful

type PipeStateful struct {
	Step  int `json:"__runtime_step"`
	Count int `json:"__runtime_count"`
	Data  any `json:"__runtime_data"`
}

type Postman

type Postman[T any] interface {
	Post(t T)
	Close()
}

Postman is a single-threaded task processor that combines blocked tasks.

func NewPostman

func NewPostman[T any](consumers ...Consumer[[]T]) Postman[T]

type RetryCallback

type RetryCallback = func(state *TaskState, delay time.Duration, err error)

type Stateful

type Stateful = any

Stateful values must could be deserialized by json.Unmarshal

func DumpAndLoadTest

func DumpAndLoadTest(s Stateful) (Stateful, error)

DumpAndLoadTest simulates the dump and load process.

func EmptyTask

func EmptyTask(sin Stateful) (Stateful, error)

func Run

func Run(name string, fn TaskFunc, args Stateful, opts ...Option) (Stateful, error)

func RunSpec

func RunSpec(spec TaskSpec, opts ...Option) (Stateful, error)

func RunSpecList

func RunSpecList(specs []TaskSpec, opts ...Option) (Stateful, error)

func RunTest

func RunTest(fn TaskFunc, ins Stateful) (outs Stateful, errCount int, rerr error)

RunTest runs the given task function and returns the output and error count.

type TaskCallback

type TaskCallback = func(state *TaskState, err error)

TaskCallback is a function type that is called when a task completes. It receives the task state and an error if one occurred.

type TaskFunc

type TaskFunc = func(sin Stateful) (Stateful, error)

func Map

func Map[Input Stateful, Output Stateful](task TaskFuncG[Input, Output]) TaskFunc

Map run elements of map or slices on the same task; this is data parallel

func Parallel

func Parallel(tasks ...any) TaskFunc

Parallel run the same data on different tasks; this is task parallel

func Pipe

func Pipe(tasks ...any) TaskFunc

Pipe prev task output will be used as next task's input, like unix's pipe

func WrapRemoveError

func WrapRemoveError(fn TaskFunc) TaskFunc

func WrapTaskFunc

func WrapTaskFunc(fn any) TaskFunc

func WrapTaskG

func WrapTaskG[Input Stateful, Output Stateful](fn TaskFuncG[Input, Output]) TaskFunc

func WrapWithCallback

func WrapWithCallback(fn TaskFunc, cb func(sout Stateful, err error)) TaskFunc

type TaskFuncG

type TaskFuncG[Input Stateful, Output Stateful] func(sin Input) (Output, error)

type TaskSpec

type TaskSpec struct {
	// Name is the name of the task.
	Name string
	// Exec is the function to be executed for the task.
	Exec TaskFunc
	// Args are the arguments to be passed to the task function.
	Args Stateful
	// Opts are the options for configuring the task execution.
	Opts []Option
}

TaskSpec defines the specification for a task.

type TaskState

type TaskState struct {
	// Name is the name of the task.
	Name string `json:"name"`
	// Finished indicates whether the task has completed.
	Finished bool `json:"finished"`
	// Success indicates whether the task was successful.
	Success bool `json:"success"`
	// ExecutedCount is the number of times the task has been executed.
	ExecutedCount int `json:"executed_count"`
	// Stateful is the internal state of the task.
	Stateful Stateful `json:"stateful"`
	// Error contains any error message if the task failed.
	Error string `json:"error,omitempty"`
	// CreatedAt is the time when the task was created.
	CreatedAt time.Time `json:"created_at,omitempty"`
	// UpdatedAt is the time when the task was last updated.
	UpdatedAt time.Time `json:"updated_at,omitempty"`
}

TaskState represents the state of a task.

func NewTaskState

func NewTaskState(spec *TaskSpec, t time.Time) TaskState

func NewTaskStateList

func NewTaskStateList(specs []TaskSpec, t time.Time) []TaskState

func (*TaskState) IsEmpty

func (ts *TaskState) IsEmpty() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL