workflow

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

README

Workflow

An easy-to-use workflow automation library for Go. Supports conditional branching, parallel execution, embedded scripting, and execution checkpointing.

Think of it like a lightweight hybrid of Temporal and AWS Step Functions.

When defining steps, you have access to string templating and scripting features using the Risor language.

Main Concepts

Concept Description
Workflow A repeatable process defined as a directed graph of steps
Steps Individual nodes in the workflow graph
Activities Functions that perform the actual work
Edges Define flow between steps
Execution A single run of a workflow
State Shared mutable state that persists for the duration of an execution
How They Work Together

Workflows define Steps that execute Activities. An Execution is a single run of a workflow. When a step finishes, its outgoing Edges are evaluated and the next step(s) are selected based on any associated conditions. The State may be read and written to by the activities.

Quick Example

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/deepnoodle-ai/workflow"
	"github.com/deepnoodle-ai/workflow/activities"
)

func main() {

	attempt := 0

	myOperation := func(ctx workflow.Context, input map[string]any) (string, error) {
		attempt++
		if attempt < 3 { // Simulated failure
			return "", fmt.Errorf("service is temporarily unavailable")
		}
		return "SUCCESS", nil
	}

	w, err := workflow.New(workflow.Options{
		Name: "demo",
		Steps: []*workflow.Step{
			{
				Name:     "Call My Operation",
				Activity: "my_operation",
				Store:    "result",
				Retry:    []*workflow.RetryConfig{{MaxRetries: 2}},
				Next:     []*workflow.Edge{{Step: "Finish"}},
			},
			{
				Name:     "Finish",
				Activity: "print",
				Parameters: map[string]any{
					"message": "🎉 Workflow completed successfully! Result: ${state.result}",
				},
			},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	execution, err := workflow.NewExecution(workflow.ExecutionOptions{
		Workflow: w,
		Logger:   workflow.NewLogger(),
		Activities: []workflow.Activity{
			workflow.NewTypedActivityFunction("my_operation", myOperation),
			activities.NewPrintActivity(),
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	if err := execution.Run(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Documentation

Index

Constants

View Source
const (
	// ErrorTypeAll acts as a wildcard that matches any error except fatal errors
	ErrorTypeAll = "all"

	// ErrorTypeActivityFailed matches any error except timeouts and fatal errors
	ErrorTypeActivityFailed = "activity_failed"

	// ErrorTypeTimeout matches a timeout context canceled error
	ErrorTypeTimeout = "timeout"

	// ErrorTypeFatal indicates an execution failed due to a fatal error.
	// The approach we're taking is that by default, unknown errors are
	// classified as activity failed errors. This is because we want to
	// allow retries on unknown errors by default. If we know a specific
	// error should NOT be retried, it should have type=ErrorTypeFatal set.
	ErrorTypeFatal = "fatal_error"
)

Error type constants for classification and matching

Variables

This section is empty.

Functions

func ApplyPatches

func ApplyPatches(container VariableContainer, patches []Patch)

ApplyPatches applies a list of patches to a variable container.

func InputsFromContext

func InputsFromContext(ctx Context) map[string]any

InputsFromContext returns a map of all inputs in the context. This is a copy. Any changes made to this map will not persist.

func MatchesErrorType

func MatchesErrorType(err error, errorType string) bool

MatchesErrorType checks if an error matches a specified error type pattern

func NewContext

func NewContext(ctx context.Context, opts ExecutionContextOptions) *executionContext

NewContext creates a new workflow context with direct state access

func NewExecutionID

func NewExecutionID() string

NewExecutionID returns a new UUID for execution identification

func NewJSONLogger

func NewJSONLogger() *slog.Logger

NewJSONLogger returns a logger that writes to stdout in JSON format.

func NewLogger

func NewLogger() *slog.Logger

NewLogger returns a logger that writes to stdout with colorized output if stdout is a terminal.

func VariablesFromContext

func VariablesFromContext(ctx Context) map[string]any

VariablesFromContext returns a map of all variables in the context. This is a copy. Any changes made to this map will not persist.

Types

type Activity

type Activity interface {

	// Name returns the name of the Activity
	Name() string

	// Execute the Activity with the given parameters.
	Execute(ctx Context, parameters map[string]any) (any, error)
}

Activity represents an action that can be executed as part of a workflow.

func NewActivityFunction

func NewActivityFunction(name string, fn ExecuteActivityFunc) Activity

NewActivityFunction returns an Activity for the given function.

func NewTypedActivity

func NewTypedActivity[TParams, TResult any](activity TypedActivity[TParams, TResult]) Activity

NewTypedActivity creates a new typed activity that implements the Activity interface

func NewTypedActivityFunction

func NewTypedActivityFunction[TParams, TResult any](name string, fn func(ctx Context, params TParams) (TResult, error)) Activity

NewTypedActivityFunction wraps a function for use as a TypedActivity. It implements the workflow.TypedActivity interface.

type ActivityExecutionEvent

type ActivityExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	PathID       string
	StepName     string
	ActivityName string
	Parameters   map[string]any
	Result       any
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	Error        error
}

ActivityExecutionEvent provides context for activity execution events

type ActivityExecutor

type ActivityExecutor interface {
	// ExecuteActivity runs an activity with automatic logging and checkpointing
	// In the new path-local state system, activities work directly with path state
	ExecuteActivity(ctx context.Context, stepName, pathID string, activity Activity, params map[string]interface{}, pathState *PathLocalState) (result interface{}, err error)
}

ActivityExecutor provides a simplified interface for executing activities with logging and checkpointing

type ActivityFunction

type ActivityFunction struct {
	// contains filtered or unexported fields
}

ActivityFunction wraps a function for use as an Activity. It implements the workflow.Activity interface.

func (*ActivityFunction) Execute

func (a *ActivityFunction) Execute(ctx Context, parameters map[string]any) (any, error)

Execute the Activity.

func (*ActivityFunction) Name

func (a *ActivityFunction) Name() string

Name of the Activity.

type ActivityLogEntry

type ActivityLogEntry struct {
	ID          string                 `json:"id"`
	ExecutionID string                 `json:"execution_id"`
	Activity    string                 `json:"activity"`
	StepName    string                 `json:"step_name"`
	PathID      string                 `json:"path_id"`
	Parameters  map[string]interface{} `json:"parameters"`
	Result      interface{}            `json:"result,omitempty"`
	Error       string                 `json:"error,omitempty"`
	StartTime   time.Time              `json:"start_time"`
	Duration    float64                `json:"duration"`
}

ActivityLogEntry represents a single operation log entry

type ActivityLogger

type ActivityLogger interface {
	// LogActivity logs a completed activity
	LogActivity(ctx context.Context, entry *ActivityLogEntry) error

	// GetActivityHistory retrieves activity log for an execution
	GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)
}

ActivityLogger defines simple operation logging interface

type ActivityRegistry

type ActivityRegistry map[string]Activity

ActivityRegistry contains activities indexed by name.

type BaseExecutionCallbacks

type BaseExecutionCallbacks struct{}

BaseExecutionCallbacks provides a default implementation that does nothing

func (*BaseExecutionCallbacks) AfterActivityExecution

func (n *BaseExecutionCallbacks) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*BaseExecutionCallbacks) AfterPathExecution

func (n *BaseExecutionCallbacks) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)

func (*BaseExecutionCallbacks) AfterWorkflowExecution

func (n *BaseExecutionCallbacks) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

func (*BaseExecutionCallbacks) BeforeActivityExecution

func (n *BaseExecutionCallbacks) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*BaseExecutionCallbacks) BeforePathExecution

func (n *BaseExecutionCallbacks) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)

func (*BaseExecutionCallbacks) BeforeWorkflowExecution

func (n *BaseExecutionCallbacks) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

type CallbackChain

type CallbackChain struct {
	// contains filtered or unexported fields
}

CallbackChain allows chaining multiple callback implementations

func NewCallbackChain

func NewCallbackChain(callbacks ...ExecutionCallbacks) *CallbackChain

NewCallbackChain creates a new callback chain

func (*CallbackChain) Add

func (c *CallbackChain) Add(callback ExecutionCallbacks)

Add adds a callback to the chain

func (*CallbackChain) AfterActivityExecution

func (c *CallbackChain) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*CallbackChain) AfterPathExecution

func (c *CallbackChain) AfterPathExecution(ctx context.Context, event *PathExecutionEvent)

func (*CallbackChain) AfterWorkflowExecution

func (c *CallbackChain) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

func (*CallbackChain) BeforeActivityExecution

func (c *CallbackChain) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*CallbackChain) BeforePathExecution

func (c *CallbackChain) BeforePathExecution(ctx context.Context, event *PathExecutionEvent)

func (*CallbackChain) BeforeWorkflowExecution

func (c *CallbackChain) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

type CatchConfig

type CatchConfig struct {
	ErrorEquals []string `json:"error_equals"`
	Next        string   `json:"next"`
	Store       string   `json:"store,omitempty"`
}

CatchConfig configures fallback behavior when errors occur

type Checkpoint

type Checkpoint struct {
	ID           string                 `json:"id"`
	ExecutionID  string                 `json:"execution_id"`
	WorkflowName string                 `json:"workflow_name"`
	Status       string                 `json:"status"`
	Inputs       map[string]interface{} `json:"inputs"`
	Outputs      map[string]interface{} `json:"outputs"`
	Variables    map[string]interface{} `json:"variables"`
	PathStates   map[string]*PathState  `json:"path_states"`
	JoinStates   map[string]*JoinState  `json:"join_states"`
	PathCounter  int                    `json:"path_counter"`
	Error        string                 `json:"error,omitempty"`
	StartTime    time.Time              `json:"start_time,omitzero"`
	EndTime      time.Time              `json:"end_time,omitzero"`
	CheckpointAt time.Time              `json:"checkpoint_at"`
}

Checkpoint contains a complete snapshot of execution state

type Checkpointer

type Checkpointer interface {
	// SaveCheckpoint saves the current execution state
	SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

	// LoadCheckpoint loads the latest checkpoint for an execution
	LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

	// DeleteCheckpoint removes checkpoint data for an execution
	DeleteCheckpoint(ctx context.Context, executionID string) error
}

Checkpointer defines simple checkpoint interface

type ChildWorkflowExecutor

type ChildWorkflowExecutor interface {
	// ExecuteSync runs a child workflow synchronously and waits for completion
	ExecuteSync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowResult, error)

	// ExecuteAsync starts a child workflow asynchronously and returns immediately
	ExecuteAsync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowHandle, error)

	// GetResult retrieves the result of an asynchronous execution
	GetResult(ctx context.Context, handle *ChildWorkflowHandle) (*ChildWorkflowResult, error)
}

ChildWorkflowExecutor manages child workflow executions

type ChildWorkflowExecutorOptions

type ChildWorkflowExecutorOptions struct {
	WorkflowRegistry WorkflowRegistry
	Activities       []Activity
	Logger           *slog.Logger
	ActivityLogger   ActivityLogger
	Checkpointer     Checkpointer
}

ChildWorkflowExecutorOptions configures a DefaultChildWorkflowExecutor

type ChildWorkflowHandle

type ChildWorkflowHandle struct {
	ExecutionID  string `json:"execution_id"`
	WorkflowName string `json:"workflow_name"`
}

ChildWorkflowHandle represents an asynchronous child workflow execution

type ChildWorkflowResult

type ChildWorkflowResult struct {
	Outputs     map[string]interface{} `json:"outputs"`
	Status      ExecutionStatus        `json:"status"`
	ExecutionID string                 `json:"execution_id"`
	Duration    time.Duration          `json:"duration"`
	Error       error                  `json:"error,omitempty"`
}

ChildWorkflowResult represents the result of a child workflow execution

type ChildWorkflowSpec

type ChildWorkflowSpec struct {
	WorkflowName string                 `json:"workflow_name"`
	Inputs       map[string]interface{} `json:"inputs,omitempty"`
	Timeout      time.Duration          `json:"timeout,omitempty"`
	ParentID     string                 `json:"parent_id,omitempty"` // for tracing
	Sync         bool                   `json:"sync"`                // synchronous vs asynchronous
}

ChildWorkflowSpec specifies how to execute a child workflow

type Context

type Context interface {

	// workflow.Context embeds the context.Context interface.
	context.Context

	// workflow.Context embeds the VariableContainer interface.
	VariableContainer

	// ListInputs returns a slice containing all input names.
	ListInputs() []string

	// GetInput returns the value of an input variable.
	GetInput(key string) (value any, exists bool)

	// GetLogger returns the logger.
	GetLogger() *slog.Logger

	// GetCompiler returns the script compiler.
	GetCompiler() script.Compiler

	// GetPathID returns the current execution path ID.
	GetPathID() string

	// GetStepName returns the current step name.
	GetStepName() string
}

Context is a superset of of context.Context that provides access to workflow execution metadata and state.

func WithCancel

func WithCancel(parent Context) (Context, context.CancelFunc)

WithCancel creates a new workflow context with cancellation.

func WithTimeout

func WithTimeout(parent Context, timeout time.Duration) (Context, context.CancelFunc)

WithTimeout creates a new workflow context with a timeout.

type DefaultChildWorkflowExecutor

type DefaultChildWorkflowExecutor struct {
	// contains filtered or unexported fields
}

DefaultChildWorkflowExecutor provides a basic implementation of ChildWorkflowExecutor

func NewDefaultChildWorkflowExecutor

func NewDefaultChildWorkflowExecutor(opts ChildWorkflowExecutorOptions) (*DefaultChildWorkflowExecutor, error)

NewDefaultChildWorkflowExecutor creates a new DefaultChildWorkflowExecutor

func (*DefaultChildWorkflowExecutor) ExecuteAsync

ExecuteAsync starts a child workflow asynchronously

func (*DefaultChildWorkflowExecutor) ExecuteSync

ExecuteSync runs a child workflow synchronously

func (*DefaultChildWorkflowExecutor) GetResult

GetResult retrieves the result of an asynchronous execution

type Each

type Each struct {
	Items any    `json:"items"`
	As    string `json:"as,omitempty"`
}

Each is used to configure a step to loop over a list of items.

type Edge

type Edge struct {
	Step      string `json:"step"`
	Condition string `json:"condition,omitempty"`
	Path      string `json:"path,omitempty"`
}

Edge is used to configure a next step in a workflow.

type EdgeMatchingStrategy

type EdgeMatchingStrategy string

EdgeMatchingStrategy defines how edges should be evaluated

const (
	// EdgeMatchingAll evaluates all edges and follows all matches (default behavior)
	EdgeMatchingAll EdgeMatchingStrategy = "all"

	// EdgeMatchingFirst evaluates edges in order and follows only the first matching one
	EdgeMatchingFirst EdgeMatchingStrategy = "first"
)

type ErrorOutput

type ErrorOutput struct {
	Error   string      `json:"Error"`
	Cause   string      `json:"Cause"`
	Details interface{} `json:"Details,omitempty"`
}

ErrorOutput represents the structured error information passed to catch handlers

type ExecuteActivityFunc

type ExecuteActivityFunc func(ctx Context, parameters map[string]any) (any, error)

ExecuteActivityFunc is the signature for an Activity execution function.

type Execution

type Execution struct {
	// contains filtered or unexported fields
}

Execution represents a simplified workflow execution with checkpointing

func NewExecution

func NewExecution(opts ExecutionOptions) (*Execution, error)

NewExecution creates a new simplified execution

func (*Execution) GetOutputs

func (e *Execution) GetOutputs() map[string]any

GetOutputs returns the current execution outputs

func (*Execution) ID

func (e *Execution) ID() string

ID returns the execution ID

func (*Execution) Resume

func (e *Execution) Resume(ctx context.Context, priorExecutionID string) error

Resume a previous execution from its last checkpoint.

func (*Execution) Run

func (e *Execution) Run(ctx context.Context) error

Run the execution to completion.

func (*Execution) Status

func (e *Execution) Status() ExecutionStatus

Status returns the current execution status

type ExecutionAdapter

type ExecutionAdapter struct {
	// contains filtered or unexported fields
}

func (*ExecutionAdapter) ExecuteActivity

func (e *ExecutionAdapter) ExecuteActivity(ctx context.Context, stepName string, pathID string, activity Activity, params map[string]any, state *PathLocalState) (any, error)

type ExecutionCallbacks

type ExecutionCallbacks interface {
	// Workflow-level callbacks
	BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
	AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

	// Path-level callbacks
	BeforePathExecution(ctx context.Context, event *PathExecutionEvent)
	AfterPathExecution(ctx context.Context, event *PathExecutionEvent)

	// Activity-level callbacks
	BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
	AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
}

ExecutionCallbacks defines the callback interface for workflow execution events

func NewBaseExecutionCallbacks

func NewBaseExecutionCallbacks() ExecutionCallbacks

NewBaseExecutionCallbacks creates a new no-op callbacks implementation. Embed this in your own callbacks to get a default implementation that does nothing.

type ExecutionContextOptions

type ExecutionContextOptions struct {
	PathLocalState *PathLocalState
	Logger         *slog.Logger
	Compiler       script.Compiler
	PathID         string
	StepName       string
}

type ExecutionOptions

type ExecutionOptions struct {
	Workflow           *Workflow
	Inputs             map[string]any
	ActivityLogger     ActivityLogger
	Checkpointer       Checkpointer
	Logger             *slog.Logger
	Formatter          WorkflowFormatter
	ExecutionID        string
	Activities         []Activity
	ScriptCompiler     script.Compiler
	ExecutionCallbacks ExecutionCallbacks
}

ExecutionOptions configures a new execution

type ExecutionState

type ExecutionState struct {
	// contains filtered or unexported fields
}

ExecutionState consolidates all execution state into a single structure. All data here is serializable for checkpointing.

func (*ExecutionState) AddPathToJoin

func (s *ExecutionState) AddPathToJoin(stepName, pathID string, config *JoinConfig, variables, stepOutputs map[string]any)

AddPathToJoin adds a path to a join step

func (*ExecutionState) FromCheckpoint

func (s *ExecutionState) FromCheckpoint(checkpoint *Checkpoint)

FromCheckpoint restores execution state from a checkpoint

func (*ExecutionState) GeneratePathID

func (s *ExecutionState) GeneratePathID(parentID, pathName string) (string, error)

GeneratePathID creates a path ID, using pathName if provided, otherwise generating a sequential ID

func (*ExecutionState) GetAllJoinStates

func (s *ExecutionState) GetAllJoinStates() map[string]*JoinState

GetAllJoinStates returns all join states

func (*ExecutionState) GetError

func (s *ExecutionState) GetError() error

GetError returns the current execution error

func (*ExecutionState) GetFailedPathIDs

func (s *ExecutionState) GetFailedPathIDs() []string

GetFailedPathIDs returns a list of path IDs that have failed

func (*ExecutionState) GetInputs

func (s *ExecutionState) GetInputs() map[string]any

GetInputs creates a shallow copy of the inputs

func (*ExecutionState) GetJoinState

func (s *ExecutionState) GetJoinState(stepName string) *JoinState

GetJoinState returns a copy of the join state for a step

func (*ExecutionState) GetOutputs

func (s *ExecutionState) GetOutputs() map[string]any

GetOutput retrieves an output value

func (*ExecutionState) GetPathStates

func (s *ExecutionState) GetPathStates() map[string]*PathState

GetPathState retrieves a path state

func (*ExecutionState) GetStartTime

func (s *ExecutionState) GetStartTime() time.Time

GetStartTime returns the execution start time

func (*ExecutionState) GetStatus

func (s *ExecutionState) GetStatus() ExecutionStatus

GetStatus returns the current execution status

func (*ExecutionState) GetWaitingPathIDs

func (s *ExecutionState) GetWaitingPathIDs() []string

GetWaitingPathIDs returns a list of path IDs that are waiting at joins

func (*ExecutionState) ID

func (s *ExecutionState) ID() string

ID returns the execution ID

func (*ExecutionState) IsJoinReady

func (s *ExecutionState) IsJoinReady(stepName string) bool

IsJoinReady checks if a join step is ready to proceed

func (*ExecutionState) NextPathID

func (s *ExecutionState) NextPathID(baseID string) string

NextPathID generates a new unique path ID

func (*ExecutionState) RemoveJoinState

func (s *ExecutionState) RemoveJoinState(stepName string)

RemoveJoinState removes a join state after it has been processed

func (*ExecutionState) SetError

func (s *ExecutionState) SetError(err error)

SetError sets the execution error

func (*ExecutionState) SetFinished

func (s *ExecutionState) SetFinished(status ExecutionStatus, endTime time.Time, err error)

func (*ExecutionState) SetID

func (s *ExecutionState) SetID(id string)

SetID sets the execution ID

func (*ExecutionState) SetOutput

func (s *ExecutionState) SetOutput(key string, value any)

SetOutput sets an output value

func (*ExecutionState) SetPathState

func (s *ExecutionState) SetPathState(pathID string, state *PathState)

SetPathState sets or updates a path state

func (*ExecutionState) SetStatus

func (s *ExecutionState) SetStatus(status ExecutionStatus)

SetStatus updates the execution status

func (*ExecutionState) SetTiming

func (s *ExecutionState) SetTiming(startTime, endTime time.Time)

SetTiming updates the execution timing

func (*ExecutionState) ToCheckpoint

func (s *ExecutionState) ToCheckpoint() *Checkpoint

ToCheckpoint converts the execution state to a checkpoint

func (*ExecutionState) UpdatePathState

func (s *ExecutionState) UpdatePathState(pathID string, updateFn func(*PathState))

UpdatePathState applies an update function to a path state

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the execution status

const (
	ExecutionStatusPending   ExecutionStatus = "pending"
	ExecutionStatusRunning   ExecutionStatus = "running"
	ExecutionStatusWaiting   ExecutionStatus = "waiting" // New status for paths waiting at joins
	ExecutionStatusCompleted ExecutionStatus = "completed"
	ExecutionStatusFailed    ExecutionStatus = "failed"
)

type ExecutionSummary

type ExecutionSummary struct {
	ExecutionID  string        `json:"execution_id"`
	WorkflowName string        `json:"workflow_name"`
	Status       string        `json:"status"`
	StartTime    time.Time     `json:"start_time"`
	EndTime      time.Time     `json:"end_time,omitempty"`
	Duration     time.Duration `json:"duration"`
	Error        string        `json:"error,omitempty"`
}

ExecutionSummary provides a summary view of an execution

type FileActivityLogger

type FileActivityLogger struct {
	// contains filtered or unexported fields
}

FileActivityLogger is an implementation of ActivityLogger that logs to a file. A file is created per execution. The file is formatted as newline-delimited JSON.

func NewFileActivityLogger

func NewFileActivityLogger(directory string) *FileActivityLogger

func (*FileActivityLogger) GetActivityHistory

func (l *FileActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)

func (*FileActivityLogger) LogActivity

func (l *FileActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error

type FileCheckpointer

type FileCheckpointer struct {
	// contains filtered or unexported fields
}

FileCheckpointer is a file-based implementation that persists checkpoints to disk

func NewFileCheckpointer

func NewFileCheckpointer(dataDir string) (*FileCheckpointer, error)

NewFileCheckpointer creates a new file-based checkpointer

func (*FileCheckpointer) DeleteCheckpoint

func (c *FileCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error

DeleteCheckpoint removes all checkpoint data for an execution

func (*FileCheckpointer) ListExecutions

func (c *FileCheckpointer) ListExecutions(ctx context.Context) ([]*ExecutionSummary, error)

ListExecutions returns a list of all executions with their latest checkpoint info

func (*FileCheckpointer) LoadCheckpoint

func (c *FileCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

LoadCheckpoint loads the latest checkpoint for an execution

func (*FileCheckpointer) SaveCheckpoint

func (c *FileCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

SaveCheckpoint saves the execution checkpoint to disk

type Input

type Input struct {
	Name        string      `json:"name" yaml:"name"`
	Type        string      `json:"type" yaml:"type"`
	Description string      `json:"description,omitempty" yaml:"description,omitempty"`
	Default     interface{} `json:"default,omitempty" yaml:"default,omitempty"`
}

Input defines a workflow input parameter

func (*Input) IsRequired

func (i *Input) IsRequired() bool

type JitterStrategy

type JitterStrategy string

JitterStrategy defines the jitter strategy for retry delays

const (
	JitterNone JitterStrategy = "NONE"
	JitterFull JitterStrategy = "FULL"
)

type JoinConfig

type JoinConfig struct {
	// Paths specifies which named paths to wait for. If empty, waits for all active paths.
	Paths []string `json:"paths,omitempty"`

	// Count specifies the number of paths to wait for. If 0, waits for all specified paths.
	Count int `json:"count,omitempty"`

	// PathMappings specifies where to store path data. Supports two syntaxes:
	// 1. Store entire path state: "pathID": "destination"
	//    Example: "pathA": "results.pathA" stores all pathA variables under results.pathA
	// 2. Extract specific variables: "pathID.variable": "destination"
	//    Example: "pathA.result": "extracted.value" stores only pathA.result under extracted.value
	// Supports nested field extraction using dot notation for both variable names and destinations.
	PathMappings map[string]string `json:"path_mappings,omitempty"`
}

JoinConfig configures a step to wait for multiple paths to converge

type JoinRequest

type JoinRequest struct {
	StepName    string         `json:"step_name"`
	Config      *JoinConfig    `json:"config"`
	Variables   map[string]any `json:"variables"`
	StepOutputs map[string]any `json:"step_outputs"`
}

JoinRequest indicates a path is waiting at a join step

type JoinState

type JoinState struct {
	StepName      string      `json:"step_name"`
	WaitingPathID string      `json:"waiting_path_id"` // The single path that's waiting
	Config        *JoinConfig `json:"config"`
	CreatedAt     time.Time   `json:"created_at"`
}

JoinState tracks a path waiting at a join step

type MemoryWorkflowRegistry

type MemoryWorkflowRegistry struct {
	// contains filtered or unexported fields
}

MemoryWorkflowRegistry implements WorkflowRegistry using in-memory storage

func NewMemoryWorkflowRegistry

func NewMemoryWorkflowRegistry() *MemoryWorkflowRegistry

NewMemoryWorkflowRegistry creates a new in-memory workflow registry

func (*MemoryWorkflowRegistry) Get

func (r *MemoryWorkflowRegistry) Get(name string) (*Workflow, bool)

Get retrieves a workflow by name

func (*MemoryWorkflowRegistry) List

func (r *MemoryWorkflowRegistry) List() []string

List returns all registered workflow names

func (*MemoryWorkflowRegistry) Register

func (r *MemoryWorkflowRegistry) Register(workflow *Workflow) error

Register adds a workflow to the registry

type NullActivityLogger

type NullActivityLogger struct{}

NullActivityLogger is a no-op implementation of ActivityLogger.

func NewNullActivityLogger

func NewNullActivityLogger() *NullActivityLogger

func (*NullActivityLogger) GetActivityHistory

func (l *NullActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)

func (*NullActivityLogger) LogActivity

func (l *NullActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error

type NullCheckpointer

type NullCheckpointer struct{}

NullCheckpointer is a no-op implementation

func NewNullCheckpointer

func NewNullCheckpointer() *NullCheckpointer

func (*NullCheckpointer) DeleteCheckpoint

func (c *NullCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error

func (*NullCheckpointer) LoadCheckpoint

func (c *NullCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

func (*NullCheckpointer) SaveCheckpoint

func (c *NullCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

type Options

type Options struct {
	Name        string         `json:"name" yaml:"name"`
	Steps       []*Step        `json:"steps" yaml:"steps"`
	Description string         `json:"description,omitempty" yaml:"description,omitempty"`
	Path        string         `json:"path,omitempty" yaml:"path,omitempty"`
	Inputs      []*Input       `json:"inputs,omitempty" yaml:"inputs,omitempty"`
	Outputs     []*Output      `json:"outputs,omitempty" yaml:"outputs,omitempty"`
	State       map[string]any `json:"state,omitempty" yaml:"state,omitempty"`
}

Options are used to configure a workflow.

type Output

type Output struct {
	Name        string `json:"name" yaml:"name"`
	Variable    string `json:"variable" yaml:"variable"`
	Path        string `json:"path,omitempty" yaml:"path,omitempty"`
	Description string `json:"description,omitempty" yaml:"description,omitempty"`
}

Output defines a workflow output parameter

type Patch

type Patch struct {
	// contains filtered or unexported fields
}

Patch represents a change to a state variable.

func GeneratePatches

func GeneratePatches(original, modified map[string]any) []Patch

GeneratePatches compares original and modified state maps and returns patches for the differences.

func NewPatch

func NewPatch(opts PatchOptions) Patch

NewPatch creates a new Patch.

func (Patch) Delete

func (p Patch) Delete() bool

func (Patch) Value

func (p Patch) Value() any

func (Patch) Variable

func (p Patch) Variable() string

type PatchOptions

type PatchOptions struct {
	Variable string
	Value    any
	Delete   bool
}

PatchOptions is used to create a Patch.

type Path

type Path struct {
	// contains filtered or unexported fields
}

Path represents an execution path through a workflow with owned state

func NewPath

func NewPath(id string, step *Step, opts PathOptions) *Path

NewPath creates a new execution path with options pattern

func (*Path) CurrentStep

func (p *Path) CurrentStep() *Step

CurrentStep returns the current step in the path

func (*Path) ID

func (p *Path) ID() string

ID returns the path ID

func (*Path) Run

func (p *Path) Run(ctx context.Context) error

Run executes the path until completion or error

func (*Path) Variables

func (p *Path) Variables() map[string]any

Variables returns a copy of the path's current variables

type PathExecutionEvent

type PathExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	PathID       string
	Status       ExecutionStatus
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	CurrentStep  string
	StepOutputs  map[string]any
	Error        error
}

PathExecutionEvent provides context for path-level execution events

type PathLocalState

type PathLocalState struct {
	// contains filtered or unexported fields
}

PathLocalState provides activities with access to workflow input variables and to the execution path's copy of state variables.

func NewPathLocalState

func NewPathLocalState(inputs, variables map[string]any) *PathLocalState

func (*PathLocalState) DeleteVariable

func (s *PathLocalState) DeleteVariable(key string)

func (*PathLocalState) GetInput

func (s *PathLocalState) GetInput(key string) (any, bool)

func (*PathLocalState) GetVariable

func (s *PathLocalState) GetVariable(key string) (any, bool)

func (*PathLocalState) ListInputs

func (s *PathLocalState) ListInputs() []string

func (*PathLocalState) ListVariables

func (s *PathLocalState) ListVariables() []string

func (*PathLocalState) SetVariable

func (s *PathLocalState) SetVariable(key string, value any)

type PathOptions

type PathOptions struct {
	Workflow           *Workflow
	ActivityRegistry   map[string]Activity
	Logger             *slog.Logger
	Formatter          WorkflowFormatter
	Inputs             map[string]any // Initial inputs (readonly)
	Variables          map[string]any // Initial variables (will be copied)
	ActivityExecutor   ActivityExecutor
	UpdatesChannel     chan<- PathSnapshot
	ScriptCompiler     script.Compiler
	ExecutionCallbacks ExecutionCallbacks
}

PathOptions contains all dependencies needed by a Path, injected at construction

type PathSnapshot

type PathSnapshot struct {
	PathID      string
	Status      ExecutionStatus
	StepName    string
	StepOutput  any
	NewPaths    []PathSpec
	Error       error
	Timestamp   time.Time
	StartTime   time.Time
	EndTime     time.Time
	JoinRequest *JoinRequest // New field for join requests
}

PathSnapshot represents a snapshot of path state for communication

type PathSpec

type PathSpec struct {
	Step      *Step
	Variables map[string]any
	Name      string // Optional name for the path from Edge.Path
}

PathSpec specifies how to create a new path (ID generated by Execution)

type PathState

type PathState struct {
	ID           string          `json:"id"`
	Status       ExecutionStatus `json:"status"`
	CurrentStep  string          `json:"current_step"`
	StartTime    time.Time       `json:"start_time,omitzero"`
	EndTime      time.Time       `json:"end_time,omitzero"`
	ErrorMessage string          `json:"error_message,omitempty"`
	StepOutputs  map[string]any  `json:"step_outputs"`
	Variables    map[string]any  `json:"variables"`
}

PathState tracks the state of an execution path. This struct is designed to be fully JSON serializable.

func (*PathState) Copy

func (p *PathState) Copy() *PathState

Copy returns a shallow copy of the path state.

type RetryConfig

type RetryConfig struct {
	ErrorEquals    []string       `json:"error_equals,omitempty"`
	MaxRetries     int            `json:"max_retries,omitempty"`
	BaseDelay      time.Duration  `json:"base_delay,omitempty"`
	MaxDelay       time.Duration  `json:"max_delay,omitempty"`
	BackoffRate    float64        `json:"backoff_rate,omitempty"`
	JitterStrategy JitterStrategy `json:"jitter_strategy,omitempty"`
	Timeout        time.Duration  `json:"timeout,omitempty"`
}

RetryConfig configures retry behavior for a step.

type Step

type Step struct {
	Name                 string               `json:"name"`
	Description          string               `json:"description,omitempty"`
	Store                string               `json:"store,omitempty"`
	Activity             string               `json:"activity,omitempty"`
	Parameters           map[string]any       `json:"parameters,omitempty"`
	Each                 *Each                `json:"each,omitempty"`
	Join                 *JoinConfig          `json:"join,omitempty"`
	Next                 []*Edge              `json:"next,omitempty"`
	EdgeMatchingStrategy EdgeMatchingStrategy `json:"edge_matching_strategy,omitempty"`
	Retry                []*RetryConfig       `json:"retry,omitempty"`
	Catch                []*CatchConfig       `json:"catch,omitempty"`
}

Step represents a single step in a workflow.

func (*Step) GetEdgeMatchingStrategy

func (s *Step) GetEdgeMatchingStrategy() EdgeMatchingStrategy

GetEdgeMatchingStrategy returns the edge matching strategy for this step, defaulting to "all" if not specified

type TypedActivity

type TypedActivity[TParams, TResult any] interface {

	// Name returns the name of the Activity
	Name() string

	// Execute the Activity with the given parameters.
	Execute(ctx Context, parameters TParams) (TResult, error)
}

TypedActivity is a parameterized interface for activities that assists with marshalling parameters and results.

type TypedActivityAdapter

type TypedActivityAdapter[TParams, TResult any] struct {
	// contains filtered or unexported fields
}

TypedActivityAdapter wraps a TypedActivity to implement the Activity interface.

func (*TypedActivityAdapter[TParams, TResult]) Activity

func (a *TypedActivityAdapter[TParams, TResult]) Activity() TypedActivity[TParams, TResult]

Activity returns the underlying TypedActivity

func (*TypedActivityAdapter[TParams, TResult]) Execute

func (a *TypedActivityAdapter[TParams, TResult]) Execute(ctx Context, parameters map[string]any) (any, error)

Execute the Activity.

func (*TypedActivityAdapter[TParams, TResult]) Name

func (a *TypedActivityAdapter[TParams, TResult]) Name() string

Name of the Activity.

type TypedActivityFunction

type TypedActivityFunction[TParams, TResult any] struct {
	// contains filtered or unexported fields
}

TypedActivityFunction is a helper struct for creating typed activities from functions

func (*TypedActivityFunction[TParams, TResult]) Execute

func (t *TypedActivityFunction[TParams, TResult]) Execute(ctx Context, params TParams) (TResult, error)

Execute the Activity.

func (*TypedActivityFunction[TParams, TResult]) Name

func (t *TypedActivityFunction[TParams, TResult]) Name() string

Name of the Activity.

func (*TypedActivityFunction[TParams, TResult]) ParametersType

func (t *TypedActivityFunction[TParams, TResult]) ParametersType() reflect.Type

ParametersType returns the type of the parameters for the Activity.

func (*TypedActivityFunction[TParams, TResult]) ResultType

func (t *TypedActivityFunction[TParams, TResult]) ResultType() reflect.Type

ResultType returns the type of the result for the Activity.

type VariableContainer

type VariableContainer interface {

	// SetVariable sets the value of a state variable.
	SetVariable(key string, value any)

	// DeleteVariable deletes a state variable.
	DeleteVariable(key string)

	// ListVariables returns a slice containing all variable names.
	ListVariables() []string

	// GetVariable returns the value of a state variable.
	GetVariable(key string) (value any, exists bool)
}

VariableContainer is a container for state variables.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow defines a repeatable process as a graph of steps to be executed.

func LoadFile

func LoadFile(path string) (*Workflow, error)

LoadFile loads a workflow from a YAML file

func LoadString

func LoadString(data string) (*Workflow, error)

LoadString loads a workflow from a YAML string

func New

func New(opts Options) (*Workflow, error)

New returns a new Workflow configured with the given options.

func (*Workflow) Description

func (w *Workflow) Description() string

Description returns the workflow description

func (*Workflow) GetStep

func (w *Workflow) GetStep(name string) (*Step, bool)

GetStep returns a step by name

func (*Workflow) InitialState

func (w *Workflow) InitialState() map[string]any

InitialState returns the workflow initial state

func (*Workflow) Inputs

func (w *Workflow) Inputs() []*Input

Inputs returns the workflow inputs

func (*Workflow) Name

func (w *Workflow) Name() string

Name returns the workflow name

func (*Workflow) Outputs

func (w *Workflow) Outputs() []*Output

Outputs returns the workflow outputs

func (*Workflow) Path

func (w *Workflow) Path() string

Path returns the workflow path

func (*Workflow) Start

func (w *Workflow) Start() *Step

Start returns the workflow start step

func (*Workflow) StepNames

func (w *Workflow) StepNames() []string

StepNames returns the names of all steps in the workflow

func (*Workflow) Steps

func (w *Workflow) Steps() []*Step

Steps returns the workflow steps

type WorkflowError

type WorkflowError struct {
	Type    string      `json:"type"`
	Cause   string      `json:"cause"`
	Details interface{} `json:"details,omitempty"`
	Wrapped error       `json:"-"` // Original error being wrapped
}

WorkflowError represents a structured error with classification It supports Go's error wrapping patterns with Unwrap() method

func ClassifyError

func ClassifyError(err error) *WorkflowError

ClassifyError attempts to classify a regular error into a WorkflowError

func NewWorkflowError

func NewWorkflowError(errorType, cause string) *WorkflowError

NewWorkflowError creates a new WorkflowError with the specified type and cause. The type can be any user-defined string e.g. "network-error". The important thing is that it may be used to match against the type used in a retry config.

func (*WorkflowError) Error

func (e *WorkflowError) Error() string

Error implements the error interface

func (*WorkflowError) ToErrorOutput

func (e *WorkflowError) ToErrorOutput() ErrorOutput

ToErrorOutput converts a WorkflowError to ErrorOutput for catch handlers

func (*WorkflowError) Unwrap

func (e *WorkflowError) Unwrap() error

Unwrap implements the error unwrapping interface for Go's errors.Is and errors.As

type WorkflowExecutionEvent

type WorkflowExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	Status       ExecutionStatus
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	Inputs       map[string]any
	Outputs      map[string]any
	PathCount    int
	Error        error
}

WorkflowExecutionEvent provides context for workflow-level execution events

type WorkflowFormatter

type WorkflowFormatter interface {
	PrintStepStart(stepName string, activityName string)
	PrintStepOutput(stepName string, content any)
	PrintStepError(stepName string, err error)
}

WorkflowFormatter interface for pretty output

type WorkflowRegistry

type WorkflowRegistry interface {
	// Register adds a workflow to the registry
	Register(workflow *Workflow) error

	// Get retrieves a workflow by name
	Get(name string) (*Workflow, bool)

	// List returns all registered workflow names
	List() []string
}

WorkflowRegistry manages a collection of workflow definitions

Directories

Path Synopsis
cmd
workflow command
examples
branching command
callbacks command
child_workflows command
edge_matching command
error_handling command
join_paths command
retry command
retry_simple command
simple command
yaml_definition command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL