Documentation
¶
Overview ¶
Example (SimpleStream) ¶
ctx, killAll := context.WithCancel(context.Background())
defer killAll()
logger := newLogger()
ctrl := gomockCtrl(logger)
batchCount := 10
batchSize := 1
dlq := NewDLQ(
"dlq",
noopDLQDestination(ctrl),
logger,
&NoOpConnectorMetrics{},
1,
0,
)
srcTask := NewSourceTask(
"generator",
generatorSource(ctrl, logger, "generator", batchSize, batchCount),
logger,
&NoOpConnectorMetrics{},
)
destTask := NewDestinationTask(
"printer",
printerDestination(ctrl, logger, "printer", batchSize),
logger,
&NoOpConnectorMetrics{},
)
destTaskNode := &TaskNode{Task: destTask}
srcTaskNode := &TaskNode{Task: srcTask, Next: []*TaskNode{destTaskNode}}
w, err := NewWorker(
srcTaskNode,
dlq,
logger,
noop.Timer{},
)
if err != nil {
panic(err)
}
err = w.Open(ctx)
if err != nil {
panic(err)
}
var wg csync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.Do(ctx)
if err != nil {
panic(err)
}
}()
// stop node after 150ms, which should be enough to process the 10 messages
time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) })
if err := wg.WaitTimeout(ctx, time.Second); err != nil {
killAll()
} else {
logger.Info(ctx).Msg("finished successfully")
}
err = w.Close(ctx)
if err != nil {
panic(err)
}
Output: DBG opening source component=task:source connector_id=generator DBG source open component=task:source connector_id=generator DBG opening destination component=task:destination connector_id=printer DBG destination open component=task:destination connector_id=printer DBG opening destination component=task:destination connector_id=dlq DBG destination open component=task:destination connector_id=dlq DBG got record node_id=printer position=generator/0 DBG received ack node_id=generator DBG got record node_id=printer position=generator/1 DBG received ack node_id=generator DBG got record node_id=printer position=generator/2 DBG received ack node_id=generator DBG got record node_id=printer position=generator/3 DBG received ack node_id=generator DBG got record node_id=printer position=generator/4 DBG received ack node_id=generator DBG got record node_id=printer position=generator/5 DBG received ack node_id=generator DBG got record node_id=printer position=generator/6 DBG received ack node_id=generator DBG got record node_id=printer position=generator/7 DBG received ack node_id=generator DBG got record node_id=printer position=generator/8 DBG received ack node_id=generator DBG got record node_id=printer position=generator/9 DBG received ack node_id=generator INF finished successfully
Index ¶
- type Batch
- func (b *Batch) Ack(i int, j ...int)
- func (b *Batch) ActiveRecords() []opencdc.Record
- func (b *Batch) Filter(i int, j ...int)
- func (b *Batch) HasActiveRecords() bool
- func (b *Batch) Nack(i int, errs ...error)
- func (b *Batch) Retry(i int, j ...int)
- func (b *Batch) SetRecords(i int, recs []opencdc.Record)
- func (b *Batch) SplitRecord(i int, recs []opencdc.Record)
- type ConnectorMetrics
- type ConnectorMetricsImpl
- type DLQ
- type Destination
- type DestinationTask
- type NoOpConnectorMetrics
- type NoOpProcessorMetrics
- type Processor
- type ProcessorMetrics
- type ProcessorMetricsImpl
- type ProcessorTask
- type RecordFlag
- type RecordStatus
- type Source
- type SourceTask
- type Task
- type TaskNode
- type Worker
- func (w *Worker) Ack(ctx context.Context, batch *Batch) error
- func (w *Worker) Close(ctx context.Context) error
- func (w *Worker) Do(ctx context.Context) error
- func (w *Worker) Nack(ctx context.Context, batch *Batch, taskID string) error
- func (w *Worker) Open(ctx context.Context) (err error)
- func (w *Worker) Stop(ctx context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch represents a batch of records that are processed together. It keeps track of the status of each record in the batch, and provides methods to update the status of records.
func (*Batch) Ack ¶ added in v0.13.6
Ack marks the record at index i as acked. If a second index is provided, all records between i (included) and j (excluded) are marked as acked. If multiple indices are provided, the method panics. Records are marked as acked by default, so this method is only useful when reprocessing records marked to be retried.
Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked as acked.
func (*Batch) ActiveRecords ¶
ActiveRecords returns the records that are not filtered.
func (*Batch) Filter ¶
Filter marks the record at index i as filtered out. If a second index is provided, all records between i (included) and j (excluded) are marked as filtered. If multiple indices are provided, the method panics. Once a record is filtered, it is kept in the batch as to not split the batch into multiple batches. It's not returned in the active records and its status can't be changed anymore, however, it is included when acking the batch.
Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked to be retried.
func (*Batch) HasActiveRecords ¶ added in v0.13.6
HasActiveRecords returns true if the batch has any records that are not filtered out.
func (*Batch) Nack ¶
Nack marks the record at index i as nacked. If multiple errors are provided, they are assigned to the records starting at index i.
Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked as nacked.
func (*Batch) Retry ¶
Retry marks the record at index i to be retried. If a second index is provided, all records between i (included) and j (excluded) are marked to be retried. If multiple indices are provided, the method panics.
Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked to be retried.
func (*Batch) SetRecords ¶
SetRecords replaces the records in the batch starting at index i with the provided records. If recs contains n records, indices i to i+n-1 are replaced.
type ConnectorMetrics ¶ added in v0.13.5
func NewDLQMetrics ¶ added in v0.13.5
func NewDLQMetrics(pipelineName string, plugin string) ConnectorMetrics
type ConnectorMetricsImpl ¶ added in v0.13.5
type ConnectorMetricsImpl struct {
// contains filtered or unexported fields
}
func NewConnectorMetrics ¶ added in v0.13.5
func NewConnectorMetrics(pipelineName, pluginName string, connType connector.Type) ConnectorMetricsImpl
type DLQ ¶
type DLQ struct {
// contains filtered or unexported fields
}
func NewDLQ ¶
func NewDLQ( id string, destination Destination, logger log.CtxLogger, metrics ConnectorMetrics, windowSize int, windowNackThreshold int, ) *DLQ
type Destination ¶
type Destination interface {
ID() string
Open(context.Context) error
Write(context.Context, []opencdc.Record) error
Ack(context.Context) ([]connector.DestinationAck, error)
Teardown(context.Context) error
// TODO figure out if we want to handle these errors. This returns errors
// coming from the persister, which persists the connector asynchronously.
// Are we even interested in these errors in the pipeline? Sounds like
// something we could surface and handle globally in the runtime instead.
Errors() <-chan error
}
type DestinationTask ¶
type DestinationTask struct {
// contains filtered or unexported fields
}
func NewDestinationTask ¶
func NewDestinationTask( id string, destination Destination, logger log.CtxLogger, metrics ConnectorMetrics, ) *DestinationTask
func (*DestinationTask) ID ¶
func (t *DestinationTask) ID() string
type NoOpConnectorMetrics ¶ added in v0.13.5
type NoOpConnectorMetrics struct{}
type NoOpProcessorMetrics ¶ added in v0.13.5
type NoOpProcessorMetrics struct{}
type Processor ¶
type Processor interface {
// Open configures and opens a processor plugin
Open(ctx context.Context) error
Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord
// Teardown tears down a processor plugin.
// In case of standalone plugins, that means stopping the WASM module.
Teardown(context.Context) error
}
type ProcessorMetrics ¶ added in v0.13.5
type ProcessorMetricsImpl ¶ added in v0.13.5
type ProcessorMetricsImpl struct {
// contains filtered or unexported fields
}
func NewProcessorMetrics ¶ added in v0.13.5
func NewProcessorMetrics(pipelineName, plugin string) ProcessorMetricsImpl
type ProcessorTask ¶
type ProcessorTask struct {
// contains filtered or unexported fields
}
func NewProcessorTask ¶
func NewProcessorTask( id string, processor Processor, logger log.CtxLogger, metrics ProcessorMetrics, ) *ProcessorTask
func (*ProcessorTask) Do ¶
func (t *ProcessorTask) Do(ctx context.Context, b *Batch) error
Do processes a batch of records using the processor plugin. It returns an error if the processor fails to process the records, or if the processor returns an invalid number of records.
If the batch contains filtered records, the processor will only process the active records. For instance:
- Consider a batch with 5 records, 2 of which are filtered. A represents the active records, and F represents the filtered records: [A, A, F, A, F]
- The processor will only process the active records, so it will receive [A, A, A].
- The processor will return the processed records, which will be [X, X, X], where X represents the processed records. The records are either processed, filtered or errored.
- When marking the records in the batch as processed, filtered or errored, the ProcessorTask takes into account the indices of the filtered records, so it marks the correct records in the batch. In the example that we used, the processor will mark the batch as [X, X, F, X, F], leaving the filtered records as is.
func (*ProcessorTask) ID ¶
func (t *ProcessorTask) ID() string
type RecordFlag ¶
type RecordFlag int
const ( RecordFlagAck RecordFlag = iota // ack RecordFlagNack // nack RecordFlagRetry // retry RecordFlagFilter // filter )
func (RecordFlag) String ¶
func (i RecordFlag) String() string
type RecordStatus ¶
type RecordStatus struct {
Flag RecordFlag
Error error
}
RecordStatus holds the status of a record in a batch. The flag indicates the status of the record, and the error is set if the record was nacked.
type Source ¶
type Source interface {
ID() string
Open(context.Context) error
Read(context.Context) ([]opencdc.Record, error)
Ack(context.Context, []opencdc.Position) error
Teardown(context.Context) error
// TODO figure out if we want to handle these errors. This returns errors
// coming from the persister, which persists the connector asynchronously.
// Are we even interested in these errors in the pipeline? Sounds like
// something we could surface and handle globally in the runtime instead.
Errors() <-chan error
}
type SourceTask ¶
type SourceTask struct {
// contains filtered or unexported fields
}
func NewSourceTask ¶
func NewSourceTask( id string, source Source, logger log.CtxLogger, metrics ConnectorMetrics, ) *SourceTask
func (*SourceTask) GetSource ¶
func (t *SourceTask) GetSource() Source
func (*SourceTask) ID ¶
func (t *SourceTask) ID() string
type Task ¶
type Task interface {
// ID returns the identifier of this Task. Each Task in a pipeline must be
// uniquely identified by the ID.
ID() string
// Open opens the Task for processing. It is called once before the worker
// starts processing records.
Open(context.Context) error
// Close closes the Task. It is called once after the worker has stopped
// processing records.
Close(context.Context) error
// Do processes the given batch of records. It is called for each batch of
// records that the worker processes.
Do(context.Context, *Batch) error
}
Task is a unit of work that can be executed by a Worker. Each Task in a pipeline is executed sequentially, except for tasks related to different destinations, which can be executed in parallel.
type TaskNode ¶ added in v0.13.6
TaskNode represents a task in the pipeline. It contains the task itself and the next tasks to be executed after it.
func (*TaskNode) AppendToEnd ¶ added in v0.13.6
AppendToEnd adds a new task to the end of the pipeline. Note that this doesn't mean that the supplied task will be executed directly after this task. Rather, it means that it will be executed after all tasks in the linked list are executed.
If any task node in the list has more than 1 next task, the function will return an error, as it would be ambiguous where to append the task.
If the task was appended successfully, it returns the created TaskNode.
func (*TaskNode) HasNext ¶ added in v0.13.6
HasNext returns true if the task has at least one next task.
func (*TaskNode) IsFirst ¶ added in v0.13.6
IsFirst returns true if this task is the first task in the pipeline.
type Worker ¶
type Worker struct {
Source Source
FirstTask *TaskNode
DLQ *DLQ
// contains filtered or unexported fields
}
Worker collects the tasks that need to be executed in a pipeline for a specific source. It processes records from the source through the tasks until it is stopped. The worker is responsible for coordinating tasks and acking/nacking records.
Batches are processed in the following way:
- The first task is always a source task which reads a batch of records from the source. The batch is then passed to the next task.
- Any task between the source and the destination can process the batch by updating the records or their status (see RecordStatus). If a record in the batch is marked as filtered, the next task will skip processing it and consider it as already processed. If a record is marked as nacked, the record will be sent to the DLQ. If a record is marked as retry, the record will be reprocessed by the same task (relevant if a task processed only part of the batch, experienced an error and skipped the rest).
- The last task is always a destination task which writes the batch of records to the destination. The batch is then acked.
Note that if a task marks a record in the middle of a batch as nacked, the batch is split into sub-batches. The records that were successfully processed continue to the next task (and ideally to the end of the pipeline), because Conduit provides ordering guarantees. Only once the records before the nacked record are end-to-end processed, will the nacked record be sent to the DLQ. The rest of the records are processed as a sub-batch, and the same rules apply to them.
func (*Worker) Do ¶
Do processes records from the source until the worker is stopped. It returns no error if the worker is stopped gracefully.