pipeline

package module
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 8 Imported by: 0

README

pipeline

A Go package for building concurrent data processing pipelines using channels.

Overview

pipeline provides a set of composable stages for processing data streams concurrently. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

Stages are defined as structs in the stages package. Each stage has a Create method that integrates it into a pipeline by connecting input and output channels.

The package uses Go's runtime/trace to create trace regions for each stage, allowing for detailed performance analysis.

Installation

go get github.com/schraf/pipeline/v4

Usage

Basic Example

To create a pipeline, you define a Config and use NewPipeline. You then connect stages by calling their Create methods inside the Composer function.

package main

import (
	"context"
	"fmt"

	"github.com/schraf/pipeline/v4"
	"github.com/schraf/pipeline/v4/stages"
)

func main() {
	ctx := context.Background()

	// Transform stage: multiply by 2
	transformStage := stages.TransformStage[int, int]{
		Name:   "multiply",
		Buffer: 10,
		Transformer: func(ctx context.Context, x int) (*int, error) {
			result := x * 2
			return &result, nil
		},
	}

    // Configure the pipeline
	cfg := pipeline.Config[int, int]{
		Name:             "example",
		InputBufferSize:  10,
		OutputBufferSize: 10,
		Composer: func(composer pipeline.Composer[int, int]) {
			ctx := composer.Context()
			inputs := composer.Inputs()
			outputs := composer.Outputs()

			out := transformStage.Create(ctx, inputs.At(0))

			outputs.Link(ctx, 0, out)
		},
	}

	pipe, _ := pipeline.NewPipeline(ctx, cfg)

	pipe.Start()

	// Feed data into the pipeline
	go func() {
		defer pipe.CloseAllInputs()
		for i := 1; i <= 5; i++ {
			pipe.Inputs().Send(ctx, 0, i)
		}
	}()

	// Consume results from the last stage's output channel
	for v := range pipe.Outputs().SinkAtIter(ctx, 0) {
		fmt.Println(v)
	}

	// Wait for completion and check for errors
	if err := pipe.Wait(); err != nil {
		panic(err)
	}
}

Pipeline Configuration

The Config struct specifies the pipeline's basic parameters:

  • Name: Used for tracing and identification.
  • InputChannels: Number of input channels to create (defaults to 1).
  • InputBufferSize: Buffer size for each input channel.
  • OutputChannels: Number of output channels to create (defaults to 1).
  • OutputBufferSize: Buffer size for each output channel.
  • Composer: Function used to connect the pipeline inputs to outputs.

Pipeline Stages

Stages are located in the github.com/schraf/pipeline/v4/stages package.

Transform

Applies a transformation function to each value:

stage := stages.TransformStage[int, int]{
    Name:   "transform",
    Buffer: 10,
    Transformer: func(ctx context.Context, x int) (*int, error) {
        result := x * 2
        return &result, nil
    },
}
out := stage.Create(ctx, in)
Filter

Filters values based on a predicate:

stage := stages.FilterStage[int]{
    Name:   "filter",
    Buffer: 10,
    Filter: func(ctx context.Context, x int) (bool, error) {
        return x%2 == 0, nil
    },
}
out := stage.Create(ctx, in)
ParallelTransform

Applies transformation with multiple concurrent workers:

stage := stages.ParallelTransformStage[int, int]{
    Name:    "parallel",
    Buffer:  10,
    Workers: 5,
    Transformer: func(ctx context.Context, x int) (*int, error) {
        return &x, nil
    },
}
out := stage.Create(ctx, in)
Batch

Groups values into fixed-size batches:

stage := stages.BatchStage[int, []int]{
    Name:      "batch",
    Buffer:    10,
    BatchSize: 5,
}
out := stage.Create(ctx, in)
FanIn

Merges multiple input channels into one:

stage := stages.FanInStage[int]{
    Name:   "fan-in",
    Buffer: 10,
}
out := stage.Create(ctx, multiIn) // multiIn is a MultiChannelReceiver
FanOut

Distributes values to multiple output channels (broadcast):

stage := stages.FanOutStage[int]{
    Name:        "fan-out",
    OutputCount: 3,
    Buffer:      10,
}
outputs := stage.Create(ctx, in) // Returns MultiChannelReceiver
FanOutRoundRobin

Distributes values to multiple output channels in a round-robin fashion:

stage := stages.FanOutRoundRobinStage[int]{
    Name:        "fan-out-rr",
    OutputCount: 3,
    Buffer:      10,
}
outputs := stage.Create(ctx, in)
Split

Routes values to different channels based on a selector:

stage := stages.SplitStage[int]{
    Name:        "split",
    OutputCount: 3,
    Buffer:      10,
    Selector: func(ctx context.Context, x int) int {
        return x % 3
    },
}
outputs := stage.Create(ctx, in)
Reduce

Processes values incrementally using a reducer function:

stage := stages.ReduceStage[int, int]{
    Name:         "reduce",
    Buffer:       1,
    InitialValue: 0,
    Reducer: func(ctx context.Context, acc int, x int) (int, error) {
        return acc + x, nil
    },
}
out := stage.Create(ctx, in)
WindowedReduce

Processes values incrementally allowing intermediate results and state resets:

stage := stages.WindowedReduceStage[int, int]{
    Name:    "windowed-reduce",
    Buffer:  1,
    Initial: 0,
    Reducer: func(ctx context.Context, acc int, val int) (int, int, bool, error) {
        acc += val
        // Emit and reset when sum >= 10
        if acc >= 10 {
            return 0, acc, true, nil
        }
        return acc, 0, false, nil
    },
}
out := stage.Create(ctx, in)
Flatten

Takes an input channel of slices and emits each element individually:

stage := stages.FlattenStage[int]{
    Name:   "flatten",
    Buffer: 10,
}
out := stage.Create(ctx, inSlices)
Limit

Limits the number of values passed through the stage:

stage := stages.LimitStage[int]{
    Name:   "limit",
    Buffer: 10,
    Limit:  5,
}
out := stage.Create(ctx, in)
Expand

Lazily expands single input items into multiple items using an iterator:

stage := stages.ExpandStage[int, int]{
    Name:   "expand",
    Buffer: 10,
    Expander: func(ctx context.Context, x int) iter.Seq2[int, error] {
        return func(yield func(int, error) bool) {
            yield(x, nil)
            yield(x*10, nil)
        }
    },
}
out := stage.Create(ctx, in)
Aggregate

Collects all values into a single slice:

stage := stages.AggregateStage[int]{
    Name:   "aggregate",
    Buffer: 1,
}
out := stage.Create(ctx, in) // out is <-chan []int

Error Handling

The pipeline automatically cancels all stages when an error occurs. The first error encountered is captured and returned by Wait():

if err := p.Wait(); err != nil {
    log.Fatal(err)
}

Requirements

  • Go 1.24.0 or later

License

See LICENSE file for details.

Documentation

Overview

Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidChannel = errors.New("channel link size mismatch")
)

Functions

func Drain added in v4.1.0

func Drain(err error) error

Drain wraps an error as a DrainError. When returned from a Transformer or Expander, the current item is dropped and the stage drains all remaining input without processing it, allowing upstream stages to complete and downstream stages to flush what they already have.

func DrainChannel added in v4.1.0

func DrainChannel[T any](in <-chan T)

DrainChannel all values from the upstream channel

func IsDrainError added in v4.1.0

func IsDrainError(err error) bool

func IsSkipError added in v4.1.0

func IsSkipError(err error) bool

func Skip added in v4.1.0

func Skip(err error) error

Skip wraps an error as a SkipError. When returned from a Transformer or Expander, the current item is dropped and processing continues with the next item.

Types

type Composer

type Composer[In any, Out any] struct {
	// contains filtered or unexported fields
}

func (Composer[In, Out]) Context

func (c Composer[In, Out]) Context() Context

func (Composer[In, Out]) Inputs

func (c Composer[In, Out]) Inputs() MultiChannelReceiver[In]

func (Composer[In, Out]) Outputs

func (c Composer[In, Out]) Outputs() MultiChannelSender[Out]

type Config

type Config[In any, Out any] struct {
	Name             string
	InputChannels    uint
	InputBufferSize  uint
	OutputChannels   uint
	OutputBufferSize uint
	Composer         func(Composer[In, Out]) error
}

Config defines the make up of a pipeline and is required for construction of it

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

func (Context) Go

func (c Context) Go(name string, fn func(ctx context.Context) error)

type DrainError added in v4.1.0

type DrainError struct {
	Err error
}

DrainError signals that the current item should be skipped AND the stage should stop processing new items. The stage drains its input channel to unblock upstream goroutines, then shuts down gracefully. The wrapped error is silently discarded and does not appear in the pipeline's returned error.

func (*DrainError) Error added in v4.1.0

func (e *DrainError) Error() string

func (*DrainError) Unwrap added in v4.1.0

func (e *DrainError) Unwrap() error

type InputChannel

type InputChannel[T any] interface {
	~chan T | ~<-chan T
}

type MultiChannelReceiver

type MultiChannelReceiver[T any] []<-chan T

func NewMultiChannelReceiver

func NewMultiChannelReceiver[T any, C InputChannel[T]](in ...C) MultiChannelReceiver[T]

func (MultiChannelReceiver[T]) At

func (m MultiChannelReceiver[T]) At(index uint) <-chan T

func (MultiChannelReceiver[T]) Iter

func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]

func (MultiChannelReceiver[T]) Len

func (m MultiChannelReceiver[T]) Len() int

func (MultiChannelReceiver[T]) SinkAt

func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T

func (MultiChannelReceiver[T]) SinkAtIter

func (m MultiChannelReceiver[T]) SinkAtIter(ctx context.Context, index uint) iter.Seq[T]

type MultiChannelSender

type MultiChannelSender[T any] []chan<- T

func NewMultiChannelSender

func NewMultiChannelSender[T any, C OutputChannel[T]](out ...C) MultiChannelSender[T]

func (MultiChannelSender[T]) At

func (m MultiChannelSender[T]) At(index uint) chan<- T

func (MultiChannelSender[T]) Iter

func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]

func (MultiChannelSender[T]) Len

func (m MultiChannelSender[T]) Len() int
func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error

func (MultiChannelSender[T]) LinkAll

func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error

func (MultiChannelSender[T]) Send

func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error

func (MultiChannelSender[T]) SendRoundRobin

func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error

func (MultiChannelSender[T]) SendToAll

func (m MultiChannelSender[T]) SendToAll(ctx context.Context, values ...T) error

type OutputChannel

type OutputChannel[T any] interface {
	~chan T | ~chan<- T
}

type Pipeline

type Pipeline[In any, Out any] struct {
	// contains filtered or unexported fields
}

Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.

func NewPipeline

func NewPipeline[In any, Out any](ctx context.Context, cfg Config[In, Out]) (*Pipeline[In, Out], context.Context, error)

NewPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.

func (*Pipeline[In, Out]) CloseAllInputs

func (p *Pipeline[In, Out]) CloseAllInputs()

CloseAllInputs will close all of the input channels. It is safe to call multiple times; only the first call will close the channels.

func (*Pipeline[In, Out]) Inputs

func (p *Pipeline[In, Out]) Inputs() MultiChannelSender[In]

func (*Pipeline[In, Out]) Outputs

func (p *Pipeline[In, Out]) Outputs() MultiChannelReceiver[Out]

func (*Pipeline[In, Out]) Wait

func (p *Pipeline[In, Out]) Wait() error

Wait blocks until all registered stages complete and returns all errors encountered by any stage (joined via errors.Join), or nil if all stages completed successfully.

type SkipError added in v4.1.0

type SkipError struct {
	Err error
}

SkipError signals that the current item should be skipped without killing the pipeline. The stage continues processing subsequent items normally. The wrapped error is silently discarded and does not appear in the pipeline's returned error.

func (*SkipError) Error added in v4.1.0

func (e *SkipError) Error() string

func (*SkipError) Unwrap added in v4.1.0

func (e *SkipError) Unwrap() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL