Documentation
¶
Overview ¶
This package is intended to help user create workflow pipelines.
To install pipelines:
go get -u github.com/andriiyaremenko/pipelines
How to use:
Pipeline:
import (
"context"
"github.com/andriiyaremenko/pipelines"
)
func main() {
ctx := context.Background()
handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e string) {
r.Write(42)
}
handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
r.Write(42 + e)
}
c := pipelines.New(handler1)
c = pipelines.Pipe(c, handler2)
for value, err := range c.Handle(ctx, "start") {
// handle error
if err != nil {
// ...
}
// use result v:
// ...
}
}
Worker:
import (
"context"
iter"
"github.com/andriiyaremenko/pipelines"
)
func main() {
ctx := context.Background()
handler1 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
r.Write(42)
}
handler2 := func(ctx context.Context, r pipelines.EventWriter[int], e int) {
r.Write(42 + e)
}
c := pipelines.New(handler1)
c = pipelines.Pipe(c, handler2)
eventSink := func(result iter.Seq2[int, error]) {
for value, err := range result {
// handle error
if err != nil {
// ...
}
// use result v:
// ...
}
}
w := pipelines.NewWorker(ctx, eventSink, c)
err := w.Handle(0)
// handle worker shut down error
if err != nil {
// ...
}
}
Index ¶
- Variables
- func NewError[T any](cause error, payload T) error
- type Error
- type ErrorHandler
- type ErrorWriter
- type Event
- type EventCloser
- type EventReader
- type EventWriter
- type EventWriterCloser
- type Handle
- func LiftErr[T any, Fn func(context.Context, T) error](fn Fn) Handle[T, T]
- func LiftNoContext[T, U any, Fn func(T) (U, error)](fn Fn) Handle[T, U]
- func LiftOk[T, U any, Fn func(context.Context, T) U](fn Fn) Handle[T, U]
- func Merge[T, U, N any, H1 Handle[T, U], H2 Handle[U, N]](h1 H1, h2 H2) Handle[T, N]
- func MergeErrHandle[T, U any, H Handle[T, U], ErrH Handle[error, U]](h H, errH ErrH) Handle[T, U]
- type Handler
- type HandlerOptions
- type Pipeline
- func New[T, U any](h Handler[T, U], opts ...HandlerOptions) Pipeline[T, U]
- func Pipe[T, U, N any, P Pipeline[T, U]](p P, h Handler[U, N], opts ...HandlerOptions) Pipeline[T, N]
- func Pipe2[T, U, N, S any, P Pipeline[T, U]](p P, h1 Handler[U, N], h2 Handler[N, S], opts ...HandlerOptions) Pipeline[T, S]
- func Pipe3[T, U, N, S, Y any, P Pipeline[T, U]](p P, h1 Handler[U, N], h2 Handler[N, S], h3 Handler[S, Y], ...) Pipeline[T, Y]
- func Pipe4[T, U, N, S, Y, X any, P Pipeline[T, U]](p P, h1 Handler[U, N], h2 Handler[N, S], h3 Handler[S, Y], h4 Handler[Y, X], ...) Pipeline[T, X]
- func PipeErrorHandler[T, U any](p Pipeline[T, U], h ErrorHandler) Pipeline[T, U]
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrWorkerStopped = errors.New("command worker is stopped")
Functions ¶
Types ¶
type ErrorHandler ¶
type ErrorHandler func(context.Context, ErrorWriter, error)
type ErrorWriter ¶
type ErrorWriter interface {
// Writes Event to a channel.
WriteError(err error)
}
Serves to write Events in Handle.Handle to chain Events.
type EventCloser ¶
type EventCloser interface {
// Signals that no more writes are expected.
Close()
}
Serves to close EventWriter.
type EventReader ¶
type EventReader[T any] interface { // Returns Event[T] channel. Read() <-chan Event[T] // Returns EventWriter instance on which this EventReader is based. GetWriter() EventWriterCloser[T] }
Serves to pass Events to Handlers.
type EventWriter ¶
type EventWriter[T any] interface { // Writes Event to a channel. Write(e T) ErrorWriter }
Serves to write Events in Handle.Handle to chain Events.
type EventWriterCloser ¶
type EventWriterCloser[T any] interface { EventWriter[T] EventCloser }
type Handle ¶
Handles single input and produces single output.
func LiftNoContext ¶
Constructs Handle from function, that has no context input.
type Handler ¶
type Handler[T, U any] func(context.Context, EventWriter[U], T)
Handler is used to handle particular event.
func HandleFunc ¶
HandleFunc returns Handler function.
func PassThrough ¶
Handler that writes same payload it receives without changes.
func (Handler[T, U]) ToPipeline ¶
type HandlerOptions ¶
type HandlerOptions func(ErrorHandler, int) (ErrorHandler, int)
HandlerOption returns Handler, error Handler and worker pool to use in Pipeline.
func WithErrorHandler ¶
func WithErrorHandler(errorHandler ErrorHandler) HandlerOptions
Option that specifies error handler to use along handler.
func WithHandlerPool ¶
func WithHandlerPool(size int) HandlerOptions
Option that specifies handler pool size.
func WithOptions ¶
func WithOptions(errorHandler ErrorHandler, handlerPool int) HandlerOptions
Option to use with handler.
type Pipeline ¶
type Pipeline[T, U any] func(context.Context) (EventWriterCloser[T], EventReader[U], int)
Combination of Handlers into one Pipeline.
func New ¶
func New[T, U any](h Handler[T, U], opts ...HandlerOptions) Pipeline[T, U]
Creates new `Pipeline[T, U]`.
func Pipe ¶
func Pipe[T, U, N any, P Pipeline[T, U]](p P, h Handler[U, N], opts ...HandlerOptions) Pipeline[T, N]
Adds next `Handler[U, H]` to the `Pipeline[T, U]` resulting in new `Pipeline[T, H]`.
func PipeErrorHandler ¶
func PipeErrorHandler[T, U any](p Pipeline[T, U], h ErrorHandler) Pipeline[T, U]
Adds error Handler to the `Pipeline[T, U]` resulting in new `Pipeline[T, U]`.