pipeline

package
v0.0.0-...-fda0a1a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2019 License: MIT Imports: 14 Imported by: 3

README

Pipes

Report Docs Version

Pipes provides the ability to rapidly define an application using prebuilt components (processes) that are dynamically defined.

For now Pipes is a proof of concept and should not be used in production yet.

Features

  • Concurrent execution of pipeline paths.
  • Dynamic Javascript process.
  • Prebuilt start shapes: HTTP, JSON FILE, Static Generator.
  • Pipeline definable with JSON.
  • State tracking of process blocks.
  • Customizable state changed handler.
  • Process level error reporting.
  • FUTURE: External plugin system for process blocks running as their own process in any language.
  • FUTURE: Built-in database processes.

NEW: A definition language for pipes called pipes-dl is also available. This provides a simple DSL that can be used to define a pipeline.

flow

Installation

Get the source with go get:

$ go get github.com/cbergoon/pipes

Then import the library in your project:

import "github.com/cbergoon/pipes"

Documentation

A Pipes pipeline consists of two main concepts: processes and connections. As you might have guessed a processes are are linked and communicate via connections to form a pipeline. These connections also define the process graph which define the flow of messages and execution through the pipeline.

Processes

Processes are the main parts of a process. These are similar to functions in a traditional program and define the logic of the pipeline. Processes consist of a name, type, input ports, output ports, and a state.

The type of the process specifies which of the built in types the process should use. An example of type is an HTTP process which make HTTP requests.

Inputs and outputs are named "ports" that the processes use to communicate.

The state of a process is set of definable initial data which is specifically defined per process instance.

There are currently four built-in process types: HTTP, JSON, DYNAMICJS, and GENERATOR.

Connections

Connections define the flow of the pipeline. A complete pipeline's connections will form a subset of a p-graph where only one start and end vertex exists. Connections pass JSON data.

Pipelines

Pipelines represent the entire flow through the application.

Example Usage

package main

import "fmt"

func main() {
    fmt.Println("Hello, Pipes")
}

Example Pipes Definition Language

CREATE PIPELINE "MyPipeline";

ADD "Alfa" OF "Generator" OUTPUTS = ("Out1", "Out2");
ADD "Beta" OF "DynamicJs"
    INPUTS = ("In1", "In2")
    OUTPUTS = ("Out")
    SET "src" = 'o = {
        "MyVal": In1 + "hello" + In2
    };
    console.log("hellofrom js");
    Out = JSON.stringify(o);',
    "gg" = "kk";
ADD SINK "Charlie" OF "Printer" INPUTS = ("In");

CONNECT "Alfa":"Out1" TO "Beta":"In1";
CONNECT "Alfa":"Out2" TO "Beta":"In2";
CONNECT "Beta":"Out" TO "Charlie":"In";

Contributions

All contributions are welcome.

License

This project is licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	S_TYPE_OBJECT = "OBJECT"
	S_TYPE_ARRAY  = "ARRAY"

	P_TYPE_SPLIT    = "SPLIT"
	P_TYPE_NO_SPLIT = "NOSPLIT"
)

Variables

This section is empty.

Functions

func LoadProcessPluginNameMap

func LoadProcessPluginNameMap(pluginDir string) (map[string]*plugin.Plugin, error)

LoadProcessPluginNameMap reads a specified plugin directory and returns a map of the opened plugins

Types

type ConnectionDefinition

type ConnectionDefinition struct {
	OriginProcessName      string `json:"originProcessName"`
	OriginPortName         string `json:"originPortName"`
	DestinationProcessName string `json:"destinationProcessName"`
	DestinationPortName    string `json:"destinationPortName"`
}

type DynamicJsProcess

type DynamicJsProcess struct {
	Process
	*FlowProcess

	ProcessName string

	InitialState map[string]string

	Inputs  []string
	Outputs []string
}

func NewDynamicJsProcess

func NewDynamicJsProcess(processName string, inputs, outputs []string, state map[string]string) *DynamicJsProcess

func (*DynamicJsProcess) GetFlowProcess

func (c *DynamicJsProcess) GetFlowProcess() *FlowProcess

func (*DynamicJsProcess) Initialize

func (c *DynamicJsProcess) Initialize()

func (*DynamicJsProcess) Run

func (c *DynamicJsProcess) Run()

type FlowPipeline

type FlowPipeline struct {
	Name      string
	Sink      string
	Processes map[string]Process

	Errors []*PipelineError

	StateChangeEnable           bool
	StateChangedCallbacksEnable bool
	StateChangedCallbackFn      func(state PipelineState)

	State *PipelineState
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(name string, stateChangedEnabled bool, stateChangedCallbackEnabled bool, stateChangedCallbackFn func(state PipelineState)) *FlowPipeline

func NewPipelineFromPipelineDefinition

func NewPipelineFromPipelineDefinition(definition *PipelineDefinition, pluginMap map[string]*plugin.Plugin, stateChangedEnabled bool, stateChangedCallbackEnabled bool, stateChangedCallbackFn func(state PipelineState)) (*FlowPipeline, error)

func (*FlowPipeline) AddError

func (f *FlowPipeline) AddError(err *PipelineError)

func (*FlowPipeline) AddProcess

func (f *FlowPipeline) AddProcess(name string, process Process, sink bool) error

func (*FlowPipeline) Connect

func (f *FlowPipeline) Connect(originProcess, originPort string, destinationProcess, destinationPort string) error

func (*FlowPipeline) Execute

func (f *FlowPipeline) Execute()

func (*FlowPipeline) GetPipelineState

func (f *FlowPipeline) GetPipelineState() *PipelineState

func (*FlowPipeline) Initialize

func (f *FlowPipeline) Initialize()

func (*FlowPipeline) IsProcessStateChangedEnabled

func (f *FlowPipeline) IsProcessStateChangedEnabled() bool

func (*FlowPipeline) ProcessStateChanged

func (f *FlowPipeline) ProcessStateChanged(state *ProcessState)

type FlowProcess

type FlowProcess struct {
	TypeName    string
	Inputs      map[string]chan string
	Outputs     map[string]chan string
	PipelineRef Pipeline
}

func NewFlowProcess

func NewFlowProcess(typeName string) *FlowProcess

func (*FlowProcess) AddError

func (f *FlowProcess) AddError(err *PipelineError)

func (*FlowProcess) AddInput

func (f *FlowProcess) AddInput(name string)

func (*FlowProcess) AddInputChannel

func (f *FlowProcess) AddInputChannel(name string, input chan string)

func (*FlowProcess) AddOutput

func (f *FlowProcess) AddOutput(name string)

func (*FlowProcess) AddOutputChannel

func (f *FlowProcess) AddOutputChannel(name string, output chan string)

func (*FlowProcess) GetInputChannelByName

func (f *FlowProcess) GetInputChannelByName(name string) chan string

func (*FlowProcess) GetInputChannelMap

func (f *FlowProcess) GetInputChannelMap() map[string]chan string

func (*FlowProcess) GetOutputChannelByName

func (f *FlowProcess) GetOutputChannelByName(name string) chan string

func (*FlowProcess) GetOutputChannelMap

func (f *FlowProcess) GetOutputChannelMap() map[string]chan string

func (*FlowProcess) GetTypeName

func (f *FlowProcess) GetTypeName() string

func (*FlowProcess) ProcessStateChanged

func (f *FlowProcess) ProcessStateChanged(state *ProcessState)

func (*FlowProcess) SetInputChannelByName

func (f *FlowProcess) SetInputChannelByName(name string, port chan string)

func (*FlowProcess) SetOutputChannelByName

func (f *FlowProcess) SetOutputChannelByName(name string, port chan string)

func (*FlowProcess) SetTypeName

func (f *FlowProcess) SetTypeName(typeName string)

type GeneratorProcess

type GeneratorProcess struct {
	Process
	*FlowProcess

	ProcessName string

	InitialState map[string]string

	Inputs  []string
	Outputs []string
}

func NewGeneratorProcess

func NewGeneratorProcess(processName string, inputs, outputs []string, state map[string]string) *GeneratorProcess

func (*GeneratorProcess) GetFlowProcess

func (c *GeneratorProcess) GetFlowProcess() *FlowProcess

func (*GeneratorProcess) Initialize

func (c *GeneratorProcess) Initialize()

func (*GeneratorProcess) Run

func (c *GeneratorProcess) Run()

type HttpProcess

type HttpProcess struct {
	Process
	*FlowProcess

	ProcessName string

	InitialState map[string]string

	Inputs  []string
	Outputs []string
}

func NewHttpProcess

func NewHttpProcess(processName string, inputs, outputs []string, state map[string]string) *HttpProcess

func (*HttpProcess) CloseOutputs

func (c *HttpProcess) CloseOutputs()

func (*HttpProcess) GetFlowProcess

func (c *HttpProcess) GetFlowProcess() *FlowProcess

func (*HttpProcess) Initialize

func (c *HttpProcess) Initialize()

func (*HttpProcess) Run

func (c *HttpProcess) Run()

type HttpProcessConfig

type HttpProcessConfig struct {
	Url     string
	Method  string
	Headers map[string]string
	Body    string
}

type JSONFileReaderProcess

type JSONFileReaderProcess struct {
	Process
	*FlowProcess

	ProcessName string

	InitialState map[string]string

	Inputs  []string
	Outputs []string
}

func NewJSONFileReaderProcess

func NewJSONFileReaderProcess(processName string, inputs, outputs []string, state map[string]string) *JSONFileReaderProcess

func (*JSONFileReaderProcess) GetFlowProcess

func (c *JSONFileReaderProcess) GetFlowProcess() *FlowProcess

func (*JSONFileReaderProcess) Initialize

func (c *JSONFileReaderProcess) Initialize()

func (*JSONFileReaderProcess) Run

func (c *JSONFileReaderProcess) Run()

type Pipeline

type Pipeline interface {
	Initialize()
	AddProcess(name string, process Process, sink bool) error
	Connect(originProcess, originPort string, destinationProcess, destinationPort string) error
	Execute()
	AddError(err *PipelineError)
	ProcessStateChanged(state *ProcessState)
	IsProcessStateChangedEnabled() bool
	GetPipelineState() *PipelineState
}

type PipelineDefinition

type PipelineDefinition struct {
	Pipeline    *PipelineInfoDefinition `json:"pipeline"`
	Processes   []*ProcessDefinition    `json:"processes"`
	Connections []*ConnectionDefinition `json:"connections"`
}

func NewPipelineDefinitionFromJson

func NewPipelineDefinitionFromJson(definition []byte) (*PipelineDefinition, error)

type PipelineError

type PipelineError struct {
	ProcessName     string
	ProcessTypeName string

	Error error

	ErrorTime    time.Time
	ErrorMessage string
	Content      string
	Inputs       map[string]string
}

func NewPipelineError

func NewPipelineError(pName, pTypeName string, err error, errTime time.Time, errMessage, content string, inputs map[string]string) PipelineError

type PipelineInfoDefinition

type PipelineInfoDefinition struct {
	Name string `json:"name"`
}

type PipelineState

type PipelineState struct {
	PipelineName  string
	Errors        []*PipelineError
	ProcessStates map[string]*ProcessState
}

type PrinterProcess

type PrinterProcess struct {
	Process
	*FlowProcess

	ProcessName string

	InitialState map[string]string

	Inputs  []string
	Outputs []string
}

func NewPrinterProcess

func NewPrinterProcess(processName string, inputs, outputs []string, state map[string]string) *PrinterProcess

func (*PrinterProcess) GetFlowProcess

func (c *PrinterProcess) GetFlowProcess() *FlowProcess

func (*PrinterProcess) Initialize

func (c *PrinterProcess) Initialize()

func (*PrinterProcess) Run

func (c *PrinterProcess) Run()

type Process

type Process interface {
	Initialize()
	GetFlowProcess() *FlowProcess
	Run()
}

func NewProcessByTypeName

func NewProcessByTypeName(typeName, processName string, inputs, outputs []string, state map[string]string) (Process, error)

type ProcessDefinition

type ProcessDefinition struct {
	TypeName    string            `json:"typeName"`
	ProcessName string            `json:"processName"`
	Sink        bool              `json:"sink"`
	Inputs      []string          `json:"inputs"`
	Outputs     []string          `json:"outputs"`
	State       map[string]string `json:"state"`
}

type ProcessState

type ProcessState struct {
	ExecutionId     string
	ProcessName     string
	ProcessTypeName string
	InitialState    map[string]string
	Inputs          map[string]string
	Outputs         map[string]string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL