Documentation
¶
Overview ¶
Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.
Index ¶
- Variables
- func Drain(err error) error
- func DrainChannel[T any](in <-chan T)
- func IsDrainError(err error) bool
- func IsSkipError(err error) bool
- func Skip(err error) error
- type Composer
- type Config
- type Context
- type DrainError
- type InputChannel
- type MultiChannelReceiver
- func (m MultiChannelReceiver[T]) At(index uint) <-chan T
- func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]
- func (m MultiChannelReceiver[T]) Len() int
- func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T
- func (m MultiChannelReceiver[T]) SinkAtIter(ctx context.Context, index uint) iter.Seq[T]
- type MultiChannelSender
- func (m MultiChannelSender[T]) At(index uint) chan<- T
- func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]
- func (m MultiChannelSender[T]) Len() int
- func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error
- func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error
- func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error
- func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error
- func (m MultiChannelSender[T]) SendToAll(ctx context.Context, values ...T) error
- type OutputChannel
- type Pipeline
- type SkipError
Constants ¶
This section is empty.
Variables ¶
var (
ErrInvalidChannel = errors.New("channel link size mismatch")
)
Functions ¶
func Drain ¶ added in v4.1.0
Drain wraps an error as a DrainError. When returned from a Transformer or Expander, the current item is dropped and the stage drains all remaining input without processing it, allowing upstream stages to complete and downstream stages to flush what they already have.
func DrainChannel ¶ added in v4.1.0
func DrainChannel[T any](in <-chan T)
DrainChannel all values from the upstream channel
func IsDrainError ¶ added in v4.1.0
func IsSkipError ¶ added in v4.1.0
Types ¶
type Composer ¶
func (Composer[In, Out]) Inputs ¶
func (c Composer[In, Out]) Inputs() MultiChannelReceiver[In]
func (Composer[In, Out]) Outputs ¶
func (c Composer[In, Out]) Outputs() MultiChannelSender[Out]
type Config ¶
type Config[In any, Out any] struct { Name string InputChannels uint InputBufferSize uint OutputChannels uint OutputBufferSize uint Composer func(Composer[In, Out]) error }
Config defines the make up of a pipeline and is required for construction of it
type DrainError ¶ added in v4.1.0
type DrainError struct {
Err error
}
DrainError signals that the current item should be skipped AND the stage should stop processing new items. The stage drains its input channel to unblock upstream goroutines, then shuts down gracefully. The wrapped error is silently discarded and does not appear in the pipeline's returned error.
func (*DrainError) Error ¶ added in v4.1.0
func (e *DrainError) Error() string
func (*DrainError) Unwrap ¶ added in v4.1.0
func (e *DrainError) Unwrap() error
type InputChannel ¶
type InputChannel[T any] interface { ~chan T | ~<-chan T }
type MultiChannelReceiver ¶
type MultiChannelReceiver[T any] []<-chan T
func NewMultiChannelReceiver ¶
func NewMultiChannelReceiver[T any, C InputChannel[T]](in ...C) MultiChannelReceiver[T]
func (MultiChannelReceiver[T]) At ¶
func (m MultiChannelReceiver[T]) At(index uint) <-chan T
func (MultiChannelReceiver[T]) Iter ¶
func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]
func (MultiChannelReceiver[T]) Len ¶
func (m MultiChannelReceiver[T]) Len() int
func (MultiChannelReceiver[T]) SinkAt ¶
func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T
func (MultiChannelReceiver[T]) SinkAtIter ¶
type MultiChannelSender ¶
type MultiChannelSender[T any] []chan<- T
func NewMultiChannelSender ¶
func NewMultiChannelSender[T any, C OutputChannel[T]](out ...C) MultiChannelSender[T]
func (MultiChannelSender[T]) At ¶
func (m MultiChannelSender[T]) At(index uint) chan<- T
func (MultiChannelSender[T]) Iter ¶
func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]
func (MultiChannelSender[T]) Len ¶
func (m MultiChannelSender[T]) Len() int
func (MultiChannelSender[T]) Link ¶
func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error
func (MultiChannelSender[T]) LinkAll ¶
func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error
func (MultiChannelSender[T]) Send ¶
func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error
func (MultiChannelSender[T]) SendRoundRobin ¶
func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error
type OutputChannel ¶
type OutputChannel[T any] interface { ~chan T | ~chan<- T }
type Pipeline ¶
Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.
func NewPipeline ¶
func NewPipeline[In any, Out any](ctx context.Context, cfg Config[In, Out]) (*Pipeline[In, Out], context.Context, error)
NewPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.
func (*Pipeline[In, Out]) CloseAllInputs ¶
func (p *Pipeline[In, Out]) CloseAllInputs()
CloseAllInputs will close all of the input channels. It is safe to call multiple times; only the first call will close the channels.
func (*Pipeline[In, Out]) Inputs ¶
func (p *Pipeline[In, Out]) Inputs() MultiChannelSender[In]
func (*Pipeline[In, Out]) Outputs ¶
func (p *Pipeline[In, Out]) Outputs() MultiChannelReceiver[Out]
type SkipError ¶ added in v4.1.0
type SkipError struct {
Err error
}
SkipError signals that the current item should be skipped without killing the pipeline. The stage continues processing subsequent items normally. The wrapped error is silently discarded and does not appear in the pipeline's returned error.