pipelines

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2024 License: ISC Imports: 5 Imported by: 2

README

pipelines

This package helps a user create workflow pipelines.

To install pipelines:
go get -u github.com/andriiyaremenko/pipelines
How to use:
Pipeline:
import (
	"context"

	"github.com/andriiyaremenko/pipelines"
)
func main() {
	ctx := context.Background()

	handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e string) {
			r.Write(42)
	}
	handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42 + e)
	}

	c := pipelines.New(handler1)
	c = pipelines.Pipe(c, handler2)
	for value, err := range c.Handle(ctx, "start") {
		// handle error
		if err != nil {
			// ...
		}

		// use result v:
		// ...
	}
}
Worker:
import (
	"context"
    "iter"

	"github.com/andriiyaremenko/pipelines"
)
func main() {
	ctx := context.Background()

	handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42)
	}
	handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42 + e)
	}

	c := pipelines.New(handler1)
	c = pipelines.Pipe(c, handler2)

	eventSink := func(result iter.Seq2[int, error]) {
		for value, err := range result {
			// handle error
			if err != nil {
				// ...
			}

			// use result v:
			// ...
		}
	}

	w := pipelines.NewWorker(ctx, eventSink, c)
	err := w.Handle(0)

	// handle worker shut down error
	if err != nil {
		// ...
	}
}

Documentation

Overview

This package is intended to help user create workflow pipelines.

To install pipelines:

go get -u github.com/andriiyaremenko/pipelines

How to use:

Pipeline:

import (
	"context"

	"github.com/andriiyaremenko/pipelines"
)

func main() {
	ctx := context.Background()

	handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e string) {
			r.Write(42)
	}
	handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42 + e)
	}

	c := pipelines.New(handler1)
	c = pipelines.Pipe(c, handler2)
	for value, err := range c.Handle(ctx, "start") {
		// handle error
		if err != nil {
			// ...
		}

		// use result v:
		// ...
	}
}

Worker:

import (
	"context"
	iter"

	"github.com/andriiyaremenko/pipelines"
)

func main() {
	ctx := context.Background()

	handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42)
	}
	handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
			r.Write(42 + e)
	}

	c := pipelines.New(handler1)
	c = pipelines.Pipe(c, handler2)

	eventSink := func(result iter.Seq2[int, error]) {
		for value, err := range result {
			// handle error
			if err != nil {
				// ...
			}

			// use result v:
			// ...
		}
	}

	w := pipelines.NewWorker(ctx, eventSink, c)
	err := w.Handle(0)

	// handle worker shut down error
	if err != nil {
		// ...
	}
}

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkerStopped = errors.New("command worker is stopped")

Functions

func NewError

func NewError[T any](cause error, payload T) error

Returns error with cause and payload.

Types

type Error

type Error[T any] struct {
	Payload T
	// contains filtered or unexported fields
}

func (*Error[T]) Error

func (err *Error[T]) Error() string

func (*Error[T]) Unwrap

func (err *Error[T]) Unwrap() error

type ErrorHandler

type ErrorHandler func(context.Context, ErrorWriter, error)

type ErrorWriter

type ErrorWriter interface {
	// Writes Event to a channel.
	WriteError(err error)
}

Serves to write Events in Handle.Handle to chain Events.

type Event

type Event[T any] struct {
	Payload T
	Err     error
}

Event carries information needed used in Pipeline execution.

type EventCloser

type EventCloser interface {
	// Signals that no more writes are expected.
	Close()
}

Serves to close EventWriter.

type EventReader

type EventReader[T any] interface {
	// Returns Event[T] channel.
	Read() <-chan Event[T]

	// Returns EventWriter instance on which this EventReader is based.
	GetWriter() EventWriterCloser[T]
}

Serves to pass Events to Handlers.

type EventWriter

type EventWriter[T any] interface {
	// Writes Event to a channel.
	Write(e T)
	ErrorWriter
}

Serves to write Events in Handle.Handle to chain Events.

type EventWriterCloser

type EventWriterCloser[T any] interface {
	EventWriter[T]
	EventCloser
}

type Handle

type Handle[T, U any] func(context.Context, T) (U, error)

Handles single input and produces single output.

func LiftErr

func LiftErr[T any, Fn func(context.Context, T) error](fn Fn) Handle[T, T]

Constructs Handle from function, that has only error output.

func LiftNoContext

func LiftNoContext[T, U any, Fn func(T) (U, error)](fn Fn) Handle[T, U]

Constructs Handle from function, that has no context input.

func LiftOk

func LiftOk[T, U any, Fn func(context.Context, T) U](fn Fn) Handle[T, U]

Constructs Handle from function, that has no error output.

func Merge

func Merge[T, U, N any, H1 Handle[T, U], H2 Handle[U, N]](h1 H1, h2 H2) Handle[T, N]

Combines two Handles into one with input type T and output type N.

func MergeErrHandle

func MergeErrHandle[T, U any, H Handle[T, U], ErrH Handle[error, U]](h H, errH ErrH) Handle[T, U]

Combines a Handle and error Handle into one with input type T and output type U.

type Handler

type Handler[T, U any] func(context.Context, EventWriter[U], T)

Handler is used to handle particular event.

func HandleFunc

func HandleFunc[T, U any](handle Handle[T, U]) Handler[T, U]

HandleFunc returns Handler function.

func PassThrough

func PassThrough[T any]() Handler[T, T]

Handler that writes same payload it receives without changes.

func (Handler[T, U]) ToPipeline

func (h Handler[T, U]) ToPipeline() Pipeline[T, U]

type HandlerOptions

type HandlerOptions func(ErrorHandler, int) (ErrorHandler, int)

HandlerOption returns Handler, error Handler and worker pool to use in Pipeline.

func WithErrorHandler

func WithErrorHandler(errorHandler ErrorHandler) HandlerOptions

Option that specifies error handler to use along handler.

func WithHandlerPool

func WithHandlerPool(size int) HandlerOptions

Option that specifies handler pool size.

func WithOptions

func WithOptions(errorHandler ErrorHandler, handlerPool int) HandlerOptions

Option to use with handler.

type Pipeline

type Pipeline[T, U any] func(context.Context) (EventWriterCloser[T], EventReader[U], int)

Combination of Handlers into one Pipeline.

func New

func New[T, U any](h Handler[T, U], opts ...HandlerOptions) Pipeline[T, U]

Creates new `Pipeline[T, U]`.

func Pipe

func Pipe[T, U, N any, P Pipeline[T, U]](p P, h Handler[U, N], opts ...HandlerOptions) Pipeline[T, N]

Adds next `Handler[U, H]` to the `Pipeline[T, U]` resulting in new `Pipeline[T, H]`.

func Pipe2

func Pipe2[T, U, N, S any, P Pipeline[T, U]](p P, h1 Handler[U, N], h2 Handler[N, S], opts ...HandlerOptions) Pipeline[T, S]

func Pipe3

func Pipe3[T, U, N, S, Y any, P Pipeline[T, U]](
	p P, h1 Handler[U, N], h2 Handler[N, S], h3 Handler[S, Y], opts ...HandlerOptions,
) Pipeline[T, Y]

func Pipe4

func Pipe4[T, U, N, S, Y, X any, P Pipeline[T, U]](
	p P, h1 Handler[U, N], h2 Handler[N, S], h3 Handler[S, Y], h4 Handler[Y, X], opts ...HandlerOptions,
) Pipeline[T, X]

func PipeErrorHandler

func PipeErrorHandler[T, U any](p Pipeline[T, U], h ErrorHandler) Pipeline[T, U]

Adds error Handler to the `Pipeline[T, U]` resulting in new `Pipeline[T, U]`.

func (Pipeline[T, U]) Handle

func (pipeline Pipeline[T, U]) Handle(ctx context.Context, payload T) iter.Seq2[U, error]

Handles initial Event and returns result of Pipeline execution.

type Worker

type Worker[T, U any] interface {
	// Asynchronously handles Event and returns error if Worker is stopped.
	Handle(T) error
	// returns false if Worker was stopped.
	IsRunning() bool
}

Asynchronous Pipeline

func NewWorker

func NewWorker[T, U any](ctx context.Context, eventSink func(iter.Seq2[U, error]), pipeline Pipeline[T, U]) Worker[T, U]

Returns Worker based on `Pipeline[T, U]`. eventSink is used to process the `Result[U]` of execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL