Documentation
¶
Index ¶
- Constants
- func ApplyPatches(container VariableContainer, patches []Patch)
- func InputsFromContext(ctx Context) map[string]any
- func MatchesErrorType(err error, errorType string) bool
- func NewContext(ctx context.Context, opts ExecutionContextOptions) *executionContext
- func NewExecutionID() string
- func NewJSONLogger() *slog.Logger
- func NewLogger() *slog.Logger
- func VariablesFromContext(ctx Context) map[string]any
- type Activity
- type ActivityExecutionEvent
- type ActivityExecutor
- type ActivityFunction
- type ActivityLogEntry
- type ActivityLogger
- type ActivityRegistry
- type BaseExecutionCallbacks
- func (n *BaseExecutionCallbacks) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
- func (n *BaseExecutionCallbacks) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)
- func (n *BaseExecutionCallbacks) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
- func (n *BaseExecutionCallbacks) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
- func (n *BaseExecutionCallbacks) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
- func (n *BaseExecutionCallbacks) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
- type CallbackChain
- func (c *CallbackChain) Add(callback ExecutionCallbacks)
- func (c *CallbackChain) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
- func (c *CallbackChain) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)
- func (c *CallbackChain) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
- func (c *CallbackChain) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
- func (c *CallbackChain) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
- func (c *CallbackChain) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
- type CatchConfig
- type Checkpoint
- type Checkpointer
- type ChildWorkflowExecutor
- type ChildWorkflowExecutorOptions
- type ChildWorkflowHandle
- type ChildWorkflowResult
- type ChildWorkflowSpec
- type Context
- type DefaultChildWorkflowExecutor
- func (e *DefaultChildWorkflowExecutor) ExecuteAsync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowHandle, error)
- func (e *DefaultChildWorkflowExecutor) ExecuteSync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowResult, error)
- func (e *DefaultChildWorkflowExecutor) GetResult(ctx context.Context, handle *ChildWorkflowHandle) (*ChildWorkflowResult, error)
- type Each
- type Edge
- type EdgeMatchingStrategy
- type ErrorOutput
- type ExecuteActivityFunc
- type Execution
- type ExecutionAdapter
- type ExecutionCallbacks
- type ExecutionContextOptions
- type ExecutionOptions
- type ExecutionState
- func (s *ExecutionState) AddPathToJoin(stepName, pathID string, config *JoinConfig, ...)
- func (s *ExecutionState) FromCheckpoint(checkpoint *Checkpoint)
- func (s *ExecutionState) GeneratePathID(parentID, pathName string) (string, error)
- func (s *ExecutionState) GetAllJoinStates() map[string]*JoinState
- func (s *ExecutionState) GetError() error
- func (s *ExecutionState) GetFailedPathIDs() []string
- func (s *ExecutionState) GetInputs() map[string]any
- func (s *ExecutionState) GetJoinState(stepName string) *JoinState
- func (s *ExecutionState) GetOutputs() map[string]any
- func (s *ExecutionState) GetPathStates() map[string]*PathState
- func (s *ExecutionState) GetStartTime() time.Time
- func (s *ExecutionState) GetStatus() ExecutionStatus
- func (s *ExecutionState) GetWaitingPathIDs() []string
- func (s *ExecutionState) ID() string
- func (s *ExecutionState) IsJoinReady(stepName string) bool
- func (s *ExecutionState) NextPathID(baseID string) string
- func (s *ExecutionState) RemoveJoinState(stepName string)
- func (s *ExecutionState) SetError(err error)
- func (s *ExecutionState) SetFinished(status ExecutionStatus, endTime time.Time, err error)
- func (s *ExecutionState) SetID(id string)
- func (s *ExecutionState) SetOutput(key string, value any)
- func (s *ExecutionState) SetPathState(pathID string, state *PathState)
- func (s *ExecutionState) SetStatus(status ExecutionStatus)
- func (s *ExecutionState) SetTiming(startTime, endTime time.Time)
- func (s *ExecutionState) ToCheckpoint() *Checkpoint
- func (s *ExecutionState) UpdatePathState(pathID string, updateFn func(*PathState))
- type ExecutionStatus
- type ExecutionSummary
- type FileActivityLogger
- type FileCheckpointer
- func (c *FileCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error
- func (c *FileCheckpointer) ListExecutions(ctx context.Context) ([]*ExecutionSummary, error)
- func (c *FileCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)
- func (c *FileCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error
- type Input
- type JitterStrategy
- type JoinConfig
- type JoinRequest
- type JoinState
- type MemoryWorkflowRegistry
- type NullActivityLogger
- type NullCheckpointer
- type Options
- type Output
- type Patch
- type PatchOptions
- type Path
- type PathExecutionEvent
- type PathLocalState
- func (s *PathLocalState) DeleteVariable(key string)
- func (s *PathLocalState) GetInput(key string) (any, bool)
- func (s *PathLocalState) GetVariable(key string) (any, bool)
- func (s *PathLocalState) ListInputs() []string
- func (s *PathLocalState) ListVariables() []string
- func (s *PathLocalState) SetVariable(key string, value any)
- type PathOptions
- type PathSnapshot
- type PathSpec
- type PathState
- type RetryConfig
- type Step
- type TypedActivity
- type TypedActivityAdapter
- type TypedActivityFunction
- func (t *TypedActivityFunction[TParams, TResult]) Execute(ctx Context, params TParams) (TResult, error)
- func (t *TypedActivityFunction[TParams, TResult]) Name() string
- func (t *TypedActivityFunction[TParams, TResult]) ParametersType() reflect.Type
- func (t *TypedActivityFunction[TParams, TResult]) ResultType() reflect.Type
- type VariableContainer
- type Workflow
- func (w *Workflow) Description() string
- func (w *Workflow) GetStep(name string) (*Step, bool)
- func (w *Workflow) InitialState() map[string]any
- func (w *Workflow) Inputs() []*Input
- func (w *Workflow) Name() string
- func (w *Workflow) Outputs() []*Output
- func (w *Workflow) Path() string
- func (w *Workflow) Start() *Step
- func (w *Workflow) StepNames() []string
- func (w *Workflow) Steps() []*Step
- type WorkflowError
- type WorkflowExecutionEvent
- type WorkflowFormatter
- type WorkflowRegistry
Constants ¶
const ( // ErrorTypeAll acts as a wildcard that matches any error except fatal errors ErrorTypeAll = "all" // ErrorTypeActivityFailed matches any error except timeouts and fatal errors ErrorTypeActivityFailed = "activity_failed" // ErrorTypeTimeout matches a timeout context canceled error ErrorTypeTimeout = "timeout" // ErrorTypeFatal indicates an execution failed due to a fatal error. // The approach we're taking is that by default, unknown errors are // classified as activity failed errors. This is because we want to // allow retries on unknown errors by default. If we know a specific // error should NOT be retried, it should have type=ErrorTypeFatal set. ErrorTypeFatal = "fatal_error" )
Error type constants for classification and matching
Variables ¶
This section is empty.
Functions ¶
func ApplyPatches ¶
func ApplyPatches(container VariableContainer, patches []Patch)
ApplyPatches applies a list of patches to a variable container.
func InputsFromContext ¶
InputsFromContext returns a map of all inputs in the context. This is a copy. Any changes made to this map will not persist.
func MatchesErrorType ¶
MatchesErrorType checks if an error matches a specified error type pattern
func NewContext ¶
func NewContext(ctx context.Context, opts ExecutionContextOptions) *executionContext
NewContext creates a new workflow context with direct state access
func NewExecutionID ¶
func NewExecutionID() string
NewExecutionID returns a new UUID for execution identification
func NewJSONLogger ¶
NewJSONLogger returns a logger that writes to stdout in JSON format.
func NewLogger ¶
NewLogger returns a logger that writes to stdout with colorized output if stdout is a terminal.
func VariablesFromContext ¶
VariablesFromContext returns a map of all variables in the context. This is a copy. Any changes made to this map will not persist.
Types ¶
type Activity ¶
type Activity interface {
// Name returns the name of the Activity
Name() string
// Execute the Activity with the given parameters.
Execute(ctx Context, parameters map[string]any) (any, error)
}
Activity represents an action that can be executed as part of a workflow.
func NewActivityFunction ¶
func NewActivityFunction(name string, fn ExecuteActivityFunc) Activity
NewActivityFunction returns an Activity for the given function.
func NewTypedActivity ¶
func NewTypedActivity[TParams, TResult any](activity TypedActivity[TParams, TResult]) Activity
NewTypedActivity creates a new typed activity that implements the Activity interface
type ActivityExecutionEvent ¶
type ActivityExecutionEvent struct {
ExecutionID string
WorkflowName string
PathID string
StepName string
ActivityName string
Parameters map[string]any
Result any
StartTime time.Time
EndTime time.Time
Duration time.Duration
Error error
}
ActivityExecutionEvent provides context for activity execution events
type ActivityExecutor ¶
type ActivityExecutor interface {
// ExecuteActivity runs an activity with automatic logging and checkpointing
// In the new path-local state system, activities work directly with path state
ExecuteActivity(ctx context.Context, stepName, pathID string, activity Activity, params map[string]interface{}, pathState *PathLocalState) (result interface{}, err error)
}
ActivityExecutor provides a simplified interface for executing activities with logging and checkpointing
type ActivityFunction ¶
type ActivityFunction struct {
// contains filtered or unexported fields
}
ActivityFunction wraps a function for use as an Activity. It implements the workflow.Activity interface.
type ActivityLogEntry ¶
type ActivityLogEntry struct {
ID string `json:"id"`
ExecutionID string `json:"execution_id"`
Activity string `json:"activity"`
StepName string `json:"step_name"`
PathID string `json:"path_id"`
Parameters map[string]interface{} `json:"parameters"`
Result interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
StartTime time.Time `json:"start_time"`
Duration float64 `json:"duration"`
}
ActivityLogEntry represents a single operation log entry
type ActivityLogger ¶
type ActivityLogger interface {
// LogActivity logs a completed activity
LogActivity(ctx context.Context, entry *ActivityLogEntry) error
// GetActivityHistory retrieves activity log for an execution
GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)
}
ActivityLogger defines simple operation logging interface
type ActivityRegistry ¶
ActivityRegistry contains activities indexed by name.
type BaseExecutionCallbacks ¶
type BaseExecutionCallbacks struct{}
BaseExecutionCallbacks provides a default implementation that does nothing
func (*BaseExecutionCallbacks) AfterActivityExecution ¶
func (n *BaseExecutionCallbacks) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
func (*BaseExecutionCallbacks) AfterPathExecution ¶
func (n *BaseExecutionCallbacks) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)
func (*BaseExecutionCallbacks) AfterWorkflowExecution ¶
func (n *BaseExecutionCallbacks) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
func (*BaseExecutionCallbacks) BeforeActivityExecution ¶
func (n *BaseExecutionCallbacks) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
func (*BaseExecutionCallbacks) BeforePathExecution ¶
func (n *BaseExecutionCallbacks) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
func (*BaseExecutionCallbacks) BeforeWorkflowExecution ¶
func (n *BaseExecutionCallbacks) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
type CallbackChain ¶
type CallbackChain struct {
// contains filtered or unexported fields
}
CallbackChain allows chaining multiple callback implementations
func NewCallbackChain ¶
func NewCallbackChain(callbacks ...ExecutionCallbacks) *CallbackChain
NewCallbackChain creates a new callback chain
func (*CallbackChain) Add ¶
func (c *CallbackChain) Add(callback ExecutionCallbacks)
Add adds a callback to the chain
func (*CallbackChain) AfterActivityExecution ¶
func (c *CallbackChain) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
func (*CallbackChain) AfterPathExecution ¶
func (c *CallbackChain) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)
func (*CallbackChain) AfterWorkflowExecution ¶
func (c *CallbackChain) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
func (*CallbackChain) BeforeActivityExecution ¶
func (c *CallbackChain) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
func (*CallbackChain) BeforePathExecution ¶
func (c *CallbackChain) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
func (*CallbackChain) BeforeWorkflowExecution ¶
func (c *CallbackChain) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
type CatchConfig ¶
type CatchConfig struct {
ErrorEquals []string `json:"error_equals"`
Next string `json:"next"`
Store string `json:"store,omitempty"`
}
CatchConfig configures fallback behavior when errors occur
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
ExecutionID string `json:"execution_id"`
WorkflowName string `json:"workflow_name"`
Status string `json:"status"`
Inputs map[string]interface{} `json:"inputs"`
Outputs map[string]interface{} `json:"outputs"`
Variables map[string]interface{} `json:"variables"`
PathStates map[string]*PathState `json:"path_states"`
JoinStates map[string]*JoinState `json:"join_states"`
PathCounter int `json:"path_counter"`
Error string `json:"error,omitempty"`
StartTime time.Time `json:"start_time,omitzero"`
EndTime time.Time `json:"end_time,omitzero"`
CheckpointAt time.Time `json:"checkpoint_at"`
}
Checkpoint contains a complete snapshot of execution state
type Checkpointer ¶
type Checkpointer interface {
// SaveCheckpoint saves the current execution state
SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error
// LoadCheckpoint loads the latest checkpoint for an execution
LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)
// DeleteCheckpoint removes checkpoint data for an execution
DeleteCheckpoint(ctx context.Context, executionID string) error
}
Checkpointer defines simple checkpoint interface
type ChildWorkflowExecutor ¶
type ChildWorkflowExecutor interface {
// ExecuteSync runs a child workflow synchronously and waits for completion
ExecuteSync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowResult, error)
// ExecuteAsync starts a child workflow asynchronously and returns immediately
ExecuteAsync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowHandle, error)
// GetResult retrieves the result of an asynchronous execution
GetResult(ctx context.Context, handle *ChildWorkflowHandle) (*ChildWorkflowResult, error)
}
ChildWorkflowExecutor manages child workflow executions
type ChildWorkflowExecutorOptions ¶
type ChildWorkflowExecutorOptions struct {
WorkflowRegistry WorkflowRegistry
Activities []Activity
Logger *slog.Logger
ActivityLogger ActivityLogger
Checkpointer Checkpointer
}
ChildWorkflowExecutorOptions configures a DefaultChildWorkflowExecutor
type ChildWorkflowHandle ¶
type ChildWorkflowHandle struct {
ExecutionID string `json:"execution_id"`
WorkflowName string `json:"workflow_name"`
}
ChildWorkflowHandle represents an asynchronous child workflow execution
type ChildWorkflowResult ¶
type ChildWorkflowResult struct {
Outputs map[string]interface{} `json:"outputs"`
Status ExecutionStatus `json:"status"`
ExecutionID string `json:"execution_id"`
Duration time.Duration `json:"duration"`
Error error `json:"error,omitempty"`
}
ChildWorkflowResult represents the result of a child workflow execution
type ChildWorkflowSpec ¶
type ChildWorkflowSpec struct {
WorkflowName string `json:"workflow_name"`
Inputs map[string]interface{} `json:"inputs,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
ParentID string `json:"parent_id,omitempty"` // for tracing
Sync bool `json:"sync"` // synchronous vs asynchronous
}
ChildWorkflowSpec specifies how to execute a child workflow
type Context ¶
type Context interface {
// workflow.Context embeds the context.Context interface.
context.Context
// workflow.Context embeds the VariableContainer interface.
VariableContainer
// ListInputs returns a slice containing all input names.
ListInputs() []string
// GetInput returns the value of an input variable.
GetInput(key string) (value any, exists bool)
// GetLogger returns the logger.
GetLogger() *slog.Logger
// GetCompiler returns the script compiler.
GetCompiler() script.Compiler
// GetPathID returns the current execution path ID.
GetPathID() string
// GetStepName returns the current step name.
GetStepName() string
}
Context is a superset of of context.Context that provides access to workflow execution metadata and state.
func WithCancel ¶
func WithCancel(parent Context) (Context, context.CancelFunc)
WithCancel creates a new workflow context with cancellation.
func WithTimeout ¶
WithTimeout creates a new workflow context with a timeout.
type DefaultChildWorkflowExecutor ¶
type DefaultChildWorkflowExecutor struct {
// contains filtered or unexported fields
}
DefaultChildWorkflowExecutor provides a basic implementation of ChildWorkflowExecutor
func NewDefaultChildWorkflowExecutor ¶
func NewDefaultChildWorkflowExecutor(opts ChildWorkflowExecutorOptions) (*DefaultChildWorkflowExecutor, error)
NewDefaultChildWorkflowExecutor creates a new DefaultChildWorkflowExecutor
func (*DefaultChildWorkflowExecutor) ExecuteAsync ¶
func (e *DefaultChildWorkflowExecutor) ExecuteAsync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowHandle, error)
ExecuteAsync starts a child workflow asynchronously
func (*DefaultChildWorkflowExecutor) ExecuteSync ¶
func (e *DefaultChildWorkflowExecutor) ExecuteSync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowResult, error)
ExecuteSync runs a child workflow synchronously
func (*DefaultChildWorkflowExecutor) GetResult ¶
func (e *DefaultChildWorkflowExecutor) GetResult(ctx context.Context, handle *ChildWorkflowHandle) (*ChildWorkflowResult, error)
GetResult retrieves the result of an asynchronous execution
type Edge ¶
type Edge struct {
Step string `json:"step"`
Condition string `json:"condition,omitempty"`
Path string `json:"path,omitempty"`
}
Edge is used to configure a next step in a workflow.
type EdgeMatchingStrategy ¶
type EdgeMatchingStrategy string
EdgeMatchingStrategy defines how edges should be evaluated
const ( // EdgeMatchingAll evaluates all edges and follows all matches (default behavior) EdgeMatchingAll EdgeMatchingStrategy = "all" // EdgeMatchingFirst evaluates edges in order and follows only the first matching one EdgeMatchingFirst EdgeMatchingStrategy = "first" )
type ErrorOutput ¶
type ErrorOutput struct {
Error string `json:"Error"`
Cause string `json:"Cause"`
Details interface{} `json:"Details,omitempty"`
}
ErrorOutput represents the structured error information passed to catch handlers
type ExecuteActivityFunc ¶
ExecuteActivityFunc is the signature for an Activity execution function.
type Execution ¶
type Execution struct {
// contains filtered or unexported fields
}
Execution represents a simplified workflow execution with checkpointing
func NewExecution ¶
func NewExecution(opts ExecutionOptions) (*Execution, error)
NewExecution creates a new simplified execution
func (*Execution) GetOutputs ¶
GetOutputs returns the current execution outputs
func (*Execution) Status ¶
func (e *Execution) Status() ExecutionStatus
Status returns the current execution status
type ExecutionAdapter ¶
type ExecutionAdapter struct {
// contains filtered or unexported fields
}
func (*ExecutionAdapter) ExecuteActivity ¶
type ExecutionCallbacks ¶
type ExecutionCallbacks interface {
// Workflow-level callbacks
BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
// Path-level callbacks
BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
AfterPathExecution(ctx context.Context, event *PathExecutionEvent)
// Activity-level callbacks
BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
}
ExecutionCallbacks defines the callback interface for workflow execution events
func NewBaseExecutionCallbacks ¶
func NewBaseExecutionCallbacks() ExecutionCallbacks
NewBaseExecutionCallbacks creates a new no-op callbacks implementation. Embed this in your own callbacks to get a default implementation that does nothing.
type ExecutionContextOptions ¶
type ExecutionOptions ¶
type ExecutionOptions struct {
Workflow *Workflow
Inputs map[string]any
ActivityLogger ActivityLogger
Checkpointer Checkpointer
Logger *slog.Logger
Formatter WorkflowFormatter
ExecutionID string
Activities []Activity
ScriptCompiler script.Compiler
ExecutionCallbacks ExecutionCallbacks
}
ExecutionOptions configures a new execution
type ExecutionState ¶
type ExecutionState struct {
// contains filtered or unexported fields
}
ExecutionState consolidates all execution state into a single structure. All data here is serializable for checkpointing.
func (*ExecutionState) AddPathToJoin ¶
func (s *ExecutionState) AddPathToJoin(stepName, pathID string, config *JoinConfig, variables, stepOutputs map[string]any)
AddPathToJoin adds a path to a join step
func (*ExecutionState) FromCheckpoint ¶
func (s *ExecutionState) FromCheckpoint(checkpoint *Checkpoint)
FromCheckpoint restores execution state from a checkpoint
func (*ExecutionState) GeneratePathID ¶
func (s *ExecutionState) GeneratePathID(parentID, pathName string) (string, error)
GeneratePathID creates a path ID, using pathName if provided, otherwise generating a sequential ID
func (*ExecutionState) GetAllJoinStates ¶
func (s *ExecutionState) GetAllJoinStates() map[string]*JoinState
GetAllJoinStates returns all join states
func (*ExecutionState) GetError ¶
func (s *ExecutionState) GetError() error
GetError returns the current execution error
func (*ExecutionState) GetFailedPathIDs ¶
func (s *ExecutionState) GetFailedPathIDs() []string
GetFailedPathIDs returns a list of path IDs that have failed
func (*ExecutionState) GetInputs ¶
func (s *ExecutionState) GetInputs() map[string]any
GetInputs creates a shallow copy of the inputs
func (*ExecutionState) GetJoinState ¶
func (s *ExecutionState) GetJoinState(stepName string) *JoinState
GetJoinState returns a copy of the join state for a step
func (*ExecutionState) GetOutputs ¶
func (s *ExecutionState) GetOutputs() map[string]any
GetOutput retrieves an output value
func (*ExecutionState) GetPathStates ¶
func (s *ExecutionState) GetPathStates() map[string]*PathState
GetPathState retrieves a path state
func (*ExecutionState) GetStartTime ¶
func (s *ExecutionState) GetStartTime() time.Time
GetStartTime returns the execution start time
func (*ExecutionState) GetStatus ¶
func (s *ExecutionState) GetStatus() ExecutionStatus
GetStatus returns the current execution status
func (*ExecutionState) GetWaitingPathIDs ¶
func (s *ExecutionState) GetWaitingPathIDs() []string
GetWaitingPathIDs returns a list of path IDs that are waiting at joins
func (*ExecutionState) IsJoinReady ¶
func (s *ExecutionState) IsJoinReady(stepName string) bool
IsJoinReady checks if a join step is ready to proceed
func (*ExecutionState) NextPathID ¶
func (s *ExecutionState) NextPathID(baseID string) string
NextPathID generates a new unique path ID
func (*ExecutionState) RemoveJoinState ¶
func (s *ExecutionState) RemoveJoinState(stepName string)
RemoveJoinState removes a join state after it has been processed
func (*ExecutionState) SetError ¶
func (s *ExecutionState) SetError(err error)
SetError sets the execution error
func (*ExecutionState) SetFinished ¶
func (s *ExecutionState) SetFinished(status ExecutionStatus, endTime time.Time, err error)
func (*ExecutionState) SetID ¶
func (s *ExecutionState) SetID(id string)
SetID sets the execution ID
func (*ExecutionState) SetOutput ¶
func (s *ExecutionState) SetOutput(key string, value any)
SetOutput sets an output value
func (*ExecutionState) SetPathState ¶
func (s *ExecutionState) SetPathState(pathID string, state *PathState)
SetPathState sets or updates a path state
func (*ExecutionState) SetStatus ¶
func (s *ExecutionState) SetStatus(status ExecutionStatus)
SetStatus updates the execution status
func (*ExecutionState) SetTiming ¶
func (s *ExecutionState) SetTiming(startTime, endTime time.Time)
SetTiming updates the execution timing
func (*ExecutionState) ToCheckpoint ¶
func (s *ExecutionState) ToCheckpoint() *Checkpoint
ToCheckpoint converts the execution state to a checkpoint
func (*ExecutionState) UpdatePathState ¶
func (s *ExecutionState) UpdatePathState(pathID string, updateFn func(*PathState))
UpdatePathState applies an update function to a path state
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the execution status
const ( ExecutionStatusPending ExecutionStatus = "pending" ExecutionStatusRunning ExecutionStatus = "running" ExecutionStatusWaiting ExecutionStatus = "waiting" // New status for paths waiting at joins ExecutionStatusCompleted ExecutionStatus = "completed" ExecutionStatusFailed ExecutionStatus = "failed" )
type ExecutionSummary ¶
type ExecutionSummary struct {
ExecutionID string `json:"execution_id"`
WorkflowName string `json:"workflow_name"`
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
Duration time.Duration `json:"duration"`
Error string `json:"error,omitempty"`
}
ExecutionSummary provides a summary view of an execution
type FileActivityLogger ¶
type FileActivityLogger struct {
// contains filtered or unexported fields
}
FileActivityLogger is an implementation of ActivityLogger that logs to a file. A file is created per execution. The file is formatted as newline-delimited JSON.
func NewFileActivityLogger ¶
func NewFileActivityLogger(directory string) *FileActivityLogger
func (*FileActivityLogger) GetActivityHistory ¶
func (l *FileActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)
func (*FileActivityLogger) LogActivity ¶
func (l *FileActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error
type FileCheckpointer ¶
type FileCheckpointer struct {
// contains filtered or unexported fields
}
FileCheckpointer is a file-based implementation that persists checkpoints to disk
func NewFileCheckpointer ¶
func NewFileCheckpointer(dataDir string) (*FileCheckpointer, error)
NewFileCheckpointer creates a new file-based checkpointer
func (*FileCheckpointer) DeleteCheckpoint ¶
func (c *FileCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error
DeleteCheckpoint removes all checkpoint data for an execution
func (*FileCheckpointer) ListExecutions ¶
func (c *FileCheckpointer) ListExecutions(ctx context.Context) ([]*ExecutionSummary, error)
ListExecutions returns a list of all executions with their latest checkpoint info
func (*FileCheckpointer) LoadCheckpoint ¶
func (c *FileCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)
LoadCheckpoint loads the latest checkpoint for an execution
func (*FileCheckpointer) SaveCheckpoint ¶
func (c *FileCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error
SaveCheckpoint saves the execution checkpoint to disk
type Input ¶
type Input struct {
Name string `json:"name" yaml:"name"`
Type string `json:"type" yaml:"type"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Default interface{} `json:"default,omitempty" yaml:"default,omitempty"`
}
Input defines a workflow input parameter
func (*Input) IsRequired ¶
type JitterStrategy ¶
type JitterStrategy string
JitterStrategy defines the jitter strategy for retry delays
const ( JitterNone JitterStrategy = "NONE" JitterFull JitterStrategy = "FULL" )
type JoinConfig ¶
type JoinConfig struct {
// Paths specifies which named paths to wait for. If empty, waits for all active paths.
Paths []string `json:"paths,omitempty"`
// Count specifies the number of paths to wait for. If 0, waits for all specified paths.
Count int `json:"count,omitempty"`
// PathMappings specifies where to store path data. Supports two syntaxes:
// 1. Store entire path state: "pathID": "destination"
// Example: "pathA": "results.pathA" stores all pathA variables under results.pathA
// 2. Extract specific variables: "pathID.variable": "destination"
// Example: "pathA.result": "extracted.value" stores only pathA.result under extracted.value
// Supports nested field extraction using dot notation for both variable names and destinations.
PathMappings map[string]string `json:"path_mappings,omitempty"`
}
JoinConfig configures a step to wait for multiple paths to converge
type JoinRequest ¶
type JoinRequest struct {
StepName string `json:"step_name"`
Config *JoinConfig `json:"config"`
Variables map[string]any `json:"variables"`
StepOutputs map[string]any `json:"step_outputs"`
}
JoinRequest indicates a path is waiting at a join step
type JoinState ¶
type JoinState struct {
StepName string `json:"step_name"`
WaitingPathID string `json:"waiting_path_id"` // The single path that's waiting
Config *JoinConfig `json:"config"`
CreatedAt time.Time `json:"created_at"`
}
JoinState tracks a path waiting at a join step
type MemoryWorkflowRegistry ¶
type MemoryWorkflowRegistry struct {
// contains filtered or unexported fields
}
MemoryWorkflowRegistry implements WorkflowRegistry using in-memory storage
func NewMemoryWorkflowRegistry ¶
func NewMemoryWorkflowRegistry() *MemoryWorkflowRegistry
NewMemoryWorkflowRegistry creates a new in-memory workflow registry
func (*MemoryWorkflowRegistry) Get ¶
func (r *MemoryWorkflowRegistry) Get(name string) (*Workflow, bool)
Get retrieves a workflow by name
func (*MemoryWorkflowRegistry) List ¶
func (r *MemoryWorkflowRegistry) List() []string
List returns all registered workflow names
func (*MemoryWorkflowRegistry) Register ¶
func (r *MemoryWorkflowRegistry) Register(workflow *Workflow) error
Register adds a workflow to the registry
type NullActivityLogger ¶
type NullActivityLogger struct{}
NullActivityLogger is a no-op implementation of ActivityLogger.
func NewNullActivityLogger ¶
func NewNullActivityLogger() *NullActivityLogger
func (*NullActivityLogger) GetActivityHistory ¶
func (l *NullActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)
func (*NullActivityLogger) LogActivity ¶
func (l *NullActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error
type NullCheckpointer ¶
type NullCheckpointer struct{}
NullCheckpointer is a no-op implementation
func NewNullCheckpointer ¶
func NewNullCheckpointer() *NullCheckpointer
func (*NullCheckpointer) DeleteCheckpoint ¶
func (c *NullCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error
func (*NullCheckpointer) LoadCheckpoint ¶
func (c *NullCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)
func (*NullCheckpointer) SaveCheckpoint ¶
func (c *NullCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error
type Options ¶
type Options struct {
Name string `json:"name" yaml:"name"`
Steps []*Step `json:"steps" yaml:"steps"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Path string `json:"path,omitempty" yaml:"path,omitempty"`
Inputs []*Input `json:"inputs,omitempty" yaml:"inputs,omitempty"`
Outputs []*Output `json:"outputs,omitempty" yaml:"outputs,omitempty"`
State map[string]any `json:"state,omitempty" yaml:"state,omitempty"`
}
Options are used to configure a workflow.
type Output ¶
type Output struct {
Name string `json:"name" yaml:"name"`
Variable string `json:"variable" yaml:"variable"`
Path string `json:"path,omitempty" yaml:"path,omitempty"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
}
Output defines a workflow output parameter
type Patch ¶
type Patch struct {
// contains filtered or unexported fields
}
Patch represents a change to a state variable.
func GeneratePatches ¶
GeneratePatches compares original and modified state maps and returns patches for the differences.
type PatchOptions ¶
PatchOptions is used to create a Patch.
type Path ¶
type Path struct {
// contains filtered or unexported fields
}
Path represents an execution path through a workflow with owned state
func NewPath ¶
func NewPath(id string, step *Step, opts PathOptions) *Path
NewPath creates a new execution path with options pattern
func (*Path) CurrentStep ¶
CurrentStep returns the current step in the path
type PathExecutionEvent ¶
type PathExecutionEvent struct {
ExecutionID string
WorkflowName string
PathID string
Status ExecutionStatus
StartTime time.Time
EndTime time.Time
Duration time.Duration
CurrentStep string
StepOutputs map[string]any
Error error
}
PathExecutionEvent provides context for path-level execution events
type PathLocalState ¶
type PathLocalState struct {
// contains filtered or unexported fields
}
PathLocalState provides activities with access to workflow input variables and to the execution path's copy of state variables.
func NewPathLocalState ¶
func NewPathLocalState(inputs, variables map[string]any) *PathLocalState
func (*PathLocalState) DeleteVariable ¶
func (s *PathLocalState) DeleteVariable(key string)
func (*PathLocalState) GetVariable ¶
func (s *PathLocalState) GetVariable(key string) (any, bool)
func (*PathLocalState) ListInputs ¶
func (s *PathLocalState) ListInputs() []string
func (*PathLocalState) ListVariables ¶
func (s *PathLocalState) ListVariables() []string
func (*PathLocalState) SetVariable ¶
func (s *PathLocalState) SetVariable(key string, value any)
type PathOptions ¶
type PathOptions struct {
Workflow *Workflow
ActivityRegistry map[string]Activity
Logger *slog.Logger
Formatter WorkflowFormatter
Inputs map[string]any // Initial inputs (readonly)
Variables map[string]any // Initial variables (will be copied)
ActivityExecutor ActivityExecutor
UpdatesChannel chan<- PathSnapshot
ScriptCompiler script.Compiler
ExecutionCallbacks ExecutionCallbacks
}
PathOptions contains all dependencies needed by a Path, injected at construction
type PathSnapshot ¶
type PathSnapshot struct {
PathID string
Status ExecutionStatus
StepName string
StepOutput any
NewPaths []PathSpec
Error error
Timestamp time.Time
StartTime time.Time
EndTime time.Time
JoinRequest *JoinRequest // New field for join requests
}
PathSnapshot represents a snapshot of path state for communication
type PathSpec ¶
type PathSpec struct {
Step *Step
Variables map[string]any
Name string // Optional name for the path from Edge.Path
}
PathSpec specifies how to create a new path (ID generated by Execution)
type PathState ¶
type PathState struct {
ID string `json:"id"`
Status ExecutionStatus `json:"status"`
CurrentStep string `json:"current_step"`
StartTime time.Time `json:"start_time,omitzero"`
EndTime time.Time `json:"end_time,omitzero"`
ErrorMessage string `json:"error_message,omitempty"`
StepOutputs map[string]any `json:"step_outputs"`
Variables map[string]any `json:"variables"`
}
PathState tracks the state of an execution path. This struct is designed to be fully JSON serializable.
type RetryConfig ¶
type RetryConfig struct {
ErrorEquals []string `json:"error_equals,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
BaseDelay time.Duration `json:"base_delay,omitempty"`
MaxDelay time.Duration `json:"max_delay,omitempty"`
BackoffRate float64 `json:"backoff_rate,omitempty"`
JitterStrategy JitterStrategy `json:"jitter_strategy,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
}
RetryConfig configures retry behavior for a step.
type Step ¶
type Step struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Store string `json:"store,omitempty"`
Activity string `json:"activity,omitempty"`
Parameters map[string]any `json:"parameters,omitempty"`
Each *Each `json:"each,omitempty"`
Join *JoinConfig `json:"join,omitempty"`
Next []*Edge `json:"next,omitempty"`
EdgeMatchingStrategy EdgeMatchingStrategy `json:"edge_matching_strategy,omitempty"`
Retry []*RetryConfig `json:"retry,omitempty"`
Catch []*CatchConfig `json:"catch,omitempty"`
}
Step represents a single step in a workflow.
func (*Step) GetEdgeMatchingStrategy ¶
func (s *Step) GetEdgeMatchingStrategy() EdgeMatchingStrategy
GetEdgeMatchingStrategy returns the edge matching strategy for this step, defaulting to "all" if not specified
type TypedActivity ¶
type TypedActivity[TParams, TResult any] interface { // Name returns the name of the Activity Name() string // Execute the Activity with the given parameters. Execute(ctx Context, parameters TParams) (TResult, error) }
TypedActivity is a parameterized interface for activities that assists with marshalling parameters and results.
type TypedActivityAdapter ¶
type TypedActivityAdapter[TParams, TResult any] struct { // contains filtered or unexported fields }
TypedActivityAdapter wraps a TypedActivity to implement the Activity interface.
func (*TypedActivityAdapter[TParams, TResult]) Activity ¶
func (a *TypedActivityAdapter[TParams, TResult]) Activity() TypedActivity[TParams, TResult]
Activity returns the underlying TypedActivity
func (*TypedActivityAdapter[TParams, TResult]) Execute ¶
func (a *TypedActivityAdapter[TParams, TResult]) Execute(ctx Context, parameters map[string]any) (any, error)
Execute the Activity.
func (*TypedActivityAdapter[TParams, TResult]) Name ¶
func (a *TypedActivityAdapter[TParams, TResult]) Name() string
Name of the Activity.
type TypedActivityFunction ¶
type TypedActivityFunction[TParams, TResult any] struct { // contains filtered or unexported fields }
TypedActivityFunction is a helper struct for creating typed activities from functions
func (*TypedActivityFunction[TParams, TResult]) Execute ¶
func (t *TypedActivityFunction[TParams, TResult]) Execute(ctx Context, params TParams) (TResult, error)
Execute the Activity.
func (*TypedActivityFunction[TParams, TResult]) Name ¶
func (t *TypedActivityFunction[TParams, TResult]) Name() string
Name of the Activity.
func (*TypedActivityFunction[TParams, TResult]) ParametersType ¶
func (t *TypedActivityFunction[TParams, TResult]) ParametersType() reflect.Type
ParametersType returns the type of the parameters for the Activity.
func (*TypedActivityFunction[TParams, TResult]) ResultType ¶
func (t *TypedActivityFunction[TParams, TResult]) ResultType() reflect.Type
ResultType returns the type of the result for the Activity.
type VariableContainer ¶
type VariableContainer interface {
// SetVariable sets the value of a state variable.
SetVariable(key string, value any)
// DeleteVariable deletes a state variable.
DeleteVariable(key string)
// ListVariables returns a slice containing all variable names.
ListVariables() []string
// GetVariable returns the value of a state variable.
GetVariable(key string) (value any, exists bool)
}
VariableContainer is a container for state variables.
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
Workflow defines a repeatable process as a graph of steps to be executed.
func LoadString ¶
LoadString loads a workflow from a YAML string
func (*Workflow) Description ¶
Description returns the workflow description
func (*Workflow) InitialState ¶
InitialState returns the workflow initial state
type WorkflowError ¶
type WorkflowError struct {
Type string `json:"type"`
Cause string `json:"cause"`
Details interface{} `json:"details,omitempty"`
Wrapped error `json:"-"` // Original error being wrapped
}
WorkflowError represents a structured error with classification It supports Go's error wrapping patterns with Unwrap() method
func ClassifyError ¶
func ClassifyError(err error) *WorkflowError
ClassifyError attempts to classify a regular error into a WorkflowError
func NewWorkflowError ¶
func NewWorkflowError(errorType, cause string) *WorkflowError
NewWorkflowError creates a new WorkflowError with the specified type and cause. The type can be any user-defined string e.g. "network-error". The important thing is that it may be used to match against the type used in a retry config.
func (*WorkflowError) Error ¶
func (e *WorkflowError) Error() string
Error implements the error interface
func (*WorkflowError) ToErrorOutput ¶
func (e *WorkflowError) ToErrorOutput() ErrorOutput
ToErrorOutput converts a WorkflowError to ErrorOutput for catch handlers
func (*WorkflowError) Unwrap ¶
func (e *WorkflowError) Unwrap() error
Unwrap implements the error unwrapping interface for Go's errors.Is and errors.As
type WorkflowExecutionEvent ¶
type WorkflowExecutionEvent struct {
ExecutionID string
WorkflowName string
Status ExecutionStatus
StartTime time.Time
EndTime time.Time
Duration time.Duration
Inputs map[string]any
Outputs map[string]any
PathCount int
Error error
}
WorkflowExecutionEvent provides context for workflow-level execution events
type WorkflowFormatter ¶
type WorkflowFormatter interface {
PrintStepStart(stepName string, activityName string)
PrintStepOutput(stepName string, content any)
PrintStepError(stepName string, err error)
}
WorkflowFormatter interface for pretty output
type WorkflowRegistry ¶
type WorkflowRegistry interface {
// Register adds a workflow to the registry
Register(workflow *Workflow) error
// Get retrieves a workflow by name
Get(name string) (*Workflow, bool)
// List returns all registered workflow names
List() []string
}
WorkflowRegistry manages a collection of workflow definitions
Source Files
¶
- activity.go
- activity_functions.go
- activity_logger.go
- activity_logger_file.go
- activity_logger_null.go
- checkpoint.go
- checkpointer.go
- checkpointer_file.go
- checkpointer_null.go
- child_workflow.go
- context.go
- errors.go
- execution.go
- execution_adapter.go
- execution_callbacks.go
- execution_state.go
- execution_summary.go
- logger.go
- path.go
- path_local_state.go
- path_state.go
- step.go
- variable_container.go
- workflow.go
- workflow_formatter.go
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
workflow
command
|
|
|
examples
|
|
|
branching
command
|
|
|
callbacks
command
|
|
|
child_workflows
command
|
|
|
edge_matching
command
|
|
|
error_handling
command
|
|
|
join_paths
command
|
|
|
retry
command
|
|
|
retry_simple
command
|
|
|
simple
command
|
|
|
yaml_definition
command
|
|