Documentation
¶
Index ¶
- Constants
- func LoadProcessPluginNameMap(pluginDir string) (map[string]*plugin.Plugin, error)
- type ConnectionDefinition
- type DynamicJsProcess
- type FlowPipeline
- func (f *FlowPipeline) AddError(err *PipelineError)
- func (f *FlowPipeline) AddProcess(name string, process Process, sink bool) error
- func (f *FlowPipeline) Connect(originProcess, originPort string, destinationProcess, destinationPort string) error
- func (f *FlowPipeline) Execute()
- func (f *FlowPipeline) GetPipelineState() *PipelineState
- func (f *FlowPipeline) Initialize()
- func (f *FlowPipeline) IsProcessStateChangedEnabled() bool
- func (f *FlowPipeline) ProcessStateChanged(state *ProcessState)
- type FlowProcess
- func (f *FlowProcess) AddError(err *PipelineError)
- func (f *FlowProcess) AddInput(name string)
- func (f *FlowProcess) AddInputChannel(name string, input chan string)
- func (f *FlowProcess) AddOutput(name string)
- func (f *FlowProcess) AddOutputChannel(name string, output chan string)
- func (f *FlowProcess) GetInputChannelByName(name string) chan string
- func (f *FlowProcess) GetInputChannelMap() map[string]chan string
- func (f *FlowProcess) GetOutputChannelByName(name string) chan string
- func (f *FlowProcess) GetOutputChannelMap() map[string]chan string
- func (f *FlowProcess) GetTypeName() string
- func (f *FlowProcess) ProcessStateChanged(state *ProcessState)
- func (f *FlowProcess) SetInputChannelByName(name string, port chan string)
- func (f *FlowProcess) SetOutputChannelByName(name string, port chan string)
- func (f *FlowProcess) SetTypeName(typeName string)
- type GeneratorProcess
- type HttpProcess
- type HttpProcessConfig
- type JSONFileReaderProcess
- type Pipeline
- type PipelineDefinition
- type PipelineError
- type PipelineInfoDefinition
- type PipelineState
- type PrinterProcess
- type Process
- type ProcessDefinition
- type ProcessState
Constants ¶
View Source
const ( S_TYPE_OBJECT = "OBJECT" S_TYPE_ARRAY = "ARRAY" P_TYPE_SPLIT = "SPLIT" P_TYPE_NO_SPLIT = "NOSPLIT" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConnectionDefinition ¶
type DynamicJsProcess ¶
type DynamicJsProcess struct {
Process
*FlowProcess
ProcessName string
InitialState map[string]string
Inputs []string
Outputs []string
}
func NewDynamicJsProcess ¶
func NewDynamicJsProcess(processName string, inputs, outputs []string, state map[string]string) *DynamicJsProcess
func (*DynamicJsProcess) GetFlowProcess ¶
func (c *DynamicJsProcess) GetFlowProcess() *FlowProcess
func (*DynamicJsProcess) Initialize ¶
func (c *DynamicJsProcess) Initialize()
func (*DynamicJsProcess) Run ¶
func (c *DynamicJsProcess) Run()
type FlowPipeline ¶
type FlowPipeline struct {
Name string
Sink string
Processes map[string]Process
Errors []*PipelineError
StateChangeEnable bool
StateChangedCallbacksEnable bool
StateChangedCallbackFn func(state PipelineState)
State *PipelineState
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipeline(name string, stateChangedEnabled bool, stateChangedCallbackEnabled bool, stateChangedCallbackFn func(state PipelineState)) *FlowPipeline
func NewPipelineFromPipelineDefinition ¶
func NewPipelineFromPipelineDefinition(definition *PipelineDefinition, pluginMap map[string]*plugin.Plugin, stateChangedEnabled bool, stateChangedCallbackEnabled bool, stateChangedCallbackFn func(state PipelineState)) (*FlowPipeline, error)
func (*FlowPipeline) AddError ¶
func (f *FlowPipeline) AddError(err *PipelineError)
func (*FlowPipeline) AddProcess ¶
func (f *FlowPipeline) AddProcess(name string, process Process, sink bool) error
func (*FlowPipeline) Connect ¶
func (f *FlowPipeline) Connect(originProcess, originPort string, destinationProcess, destinationPort string) error
func (*FlowPipeline) Execute ¶
func (f *FlowPipeline) Execute()
func (*FlowPipeline) GetPipelineState ¶
func (f *FlowPipeline) GetPipelineState() *PipelineState
func (*FlowPipeline) Initialize ¶
func (f *FlowPipeline) Initialize()
func (*FlowPipeline) IsProcessStateChangedEnabled ¶
func (f *FlowPipeline) IsProcessStateChangedEnabled() bool
func (*FlowPipeline) ProcessStateChanged ¶
func (f *FlowPipeline) ProcessStateChanged(state *ProcessState)
type FlowProcess ¶
type FlowProcess struct {
TypeName string
Inputs map[string]chan string
Outputs map[string]chan string
PipelineRef Pipeline
}
func NewFlowProcess ¶
func NewFlowProcess(typeName string) *FlowProcess
func (*FlowProcess) AddError ¶
func (f *FlowProcess) AddError(err *PipelineError)
func (*FlowProcess) AddInput ¶
func (f *FlowProcess) AddInput(name string)
func (*FlowProcess) AddInputChannel ¶
func (f *FlowProcess) AddInputChannel(name string, input chan string)
func (*FlowProcess) AddOutput ¶
func (f *FlowProcess) AddOutput(name string)
func (*FlowProcess) AddOutputChannel ¶
func (f *FlowProcess) AddOutputChannel(name string, output chan string)
func (*FlowProcess) GetInputChannelByName ¶
func (f *FlowProcess) GetInputChannelByName(name string) chan string
func (*FlowProcess) GetInputChannelMap ¶
func (f *FlowProcess) GetInputChannelMap() map[string]chan string
func (*FlowProcess) GetOutputChannelByName ¶
func (f *FlowProcess) GetOutputChannelByName(name string) chan string
func (*FlowProcess) GetOutputChannelMap ¶
func (f *FlowProcess) GetOutputChannelMap() map[string]chan string
func (*FlowProcess) GetTypeName ¶
func (f *FlowProcess) GetTypeName() string
func (*FlowProcess) ProcessStateChanged ¶
func (f *FlowProcess) ProcessStateChanged(state *ProcessState)
func (*FlowProcess) SetInputChannelByName ¶
func (f *FlowProcess) SetInputChannelByName(name string, port chan string)
func (*FlowProcess) SetOutputChannelByName ¶
func (f *FlowProcess) SetOutputChannelByName(name string, port chan string)
func (*FlowProcess) SetTypeName ¶
func (f *FlowProcess) SetTypeName(typeName string)
type GeneratorProcess ¶
type GeneratorProcess struct {
Process
*FlowProcess
ProcessName string
InitialState map[string]string
Inputs []string
Outputs []string
}
func NewGeneratorProcess ¶
func NewGeneratorProcess(processName string, inputs, outputs []string, state map[string]string) *GeneratorProcess
func (*GeneratorProcess) GetFlowProcess ¶
func (c *GeneratorProcess) GetFlowProcess() *FlowProcess
func (*GeneratorProcess) Initialize ¶
func (c *GeneratorProcess) Initialize()
func (*GeneratorProcess) Run ¶
func (c *GeneratorProcess) Run()
type HttpProcess ¶
type HttpProcess struct {
Process
*FlowProcess
ProcessName string
InitialState map[string]string
Inputs []string
Outputs []string
}
func NewHttpProcess ¶
func NewHttpProcess(processName string, inputs, outputs []string, state map[string]string) *HttpProcess
func (*HttpProcess) CloseOutputs ¶
func (c *HttpProcess) CloseOutputs()
func (*HttpProcess) GetFlowProcess ¶
func (c *HttpProcess) GetFlowProcess() *FlowProcess
func (*HttpProcess) Initialize ¶
func (c *HttpProcess) Initialize()
func (*HttpProcess) Run ¶
func (c *HttpProcess) Run()
type HttpProcessConfig ¶
type JSONFileReaderProcess ¶
type JSONFileReaderProcess struct {
Process
*FlowProcess
ProcessName string
InitialState map[string]string
Inputs []string
Outputs []string
}
func NewJSONFileReaderProcess ¶
func NewJSONFileReaderProcess(processName string, inputs, outputs []string, state map[string]string) *JSONFileReaderProcess
func (*JSONFileReaderProcess) GetFlowProcess ¶
func (c *JSONFileReaderProcess) GetFlowProcess() *FlowProcess
func (*JSONFileReaderProcess) Initialize ¶
func (c *JSONFileReaderProcess) Initialize()
func (*JSONFileReaderProcess) Run ¶
func (c *JSONFileReaderProcess) Run()
type Pipeline ¶
type Pipeline interface {
Initialize()
AddProcess(name string, process Process, sink bool) error
Connect(originProcess, originPort string, destinationProcess, destinationPort string) error
Execute()
AddError(err *PipelineError)
ProcessStateChanged(state *ProcessState)
IsProcessStateChangedEnabled() bool
GetPipelineState() *PipelineState
}
type PipelineDefinition ¶
type PipelineDefinition struct {
Pipeline *PipelineInfoDefinition `json:"pipeline"`
Processes []*ProcessDefinition `json:"processes"`
Connections []*ConnectionDefinition `json:"connections"`
}
func NewPipelineDefinitionFromJson ¶
func NewPipelineDefinitionFromJson(definition []byte) (*PipelineDefinition, error)
type PipelineError ¶
type PipelineError struct {
ProcessName string
ProcessTypeName string
Error error
ErrorTime time.Time
ErrorMessage string
Content string
Inputs map[string]string
}
func NewPipelineError ¶
type PipelineInfoDefinition ¶
type PipelineInfoDefinition struct {
Name string `json:"name"`
}
type PipelineState ¶
type PipelineState struct {
PipelineName string
Errors []*PipelineError
ProcessStates map[string]*ProcessState
}
type PrinterProcess ¶
type PrinterProcess struct {
Process
*FlowProcess
ProcessName string
InitialState map[string]string
Inputs []string
Outputs []string
}
func NewPrinterProcess ¶
func NewPrinterProcess(processName string, inputs, outputs []string, state map[string]string) *PrinterProcess
func (*PrinterProcess) GetFlowProcess ¶
func (c *PrinterProcess) GetFlowProcess() *FlowProcess
func (*PrinterProcess) Initialize ¶
func (c *PrinterProcess) Initialize()
func (*PrinterProcess) Run ¶
func (c *PrinterProcess) Run()
type Process ¶
type Process interface {
Initialize()
GetFlowProcess() *FlowProcess
Run()
}
type ProcessDefinition ¶
Click to show internal directories.
Click to hide internal directories.
