funnel

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Example (SimpleStream)
ctx, killAll := context.WithCancel(context.Background())
defer killAll()

logger := newLogger()
ctrl := gomockCtrl(logger)

batchCount := 10
batchSize := 1

dlq := NewDLQ(
	"dlq",
	noopDLQDestination(ctrl),
	logger,
	&NoOpConnectorMetrics{},
	1,
	0,
)
srcTask := NewSourceTask(
	"generator",
	generatorSource(ctrl, logger, "generator", batchSize, batchCount),
	logger,
	&NoOpConnectorMetrics{},
)
destTask := NewDestinationTask(
	"printer",
	printerDestination(ctrl, logger, "printer", batchSize),
	logger,
	&NoOpConnectorMetrics{},
)

destTaskNode := &TaskNode{Task: destTask}
srcTaskNode := &TaskNode{Task: srcTask, Next: []*TaskNode{destTaskNode}}

w, err := NewWorker(
	srcTaskNode,
	dlq,
	logger,
	noop.Timer{},
)
if err != nil {
	panic(err)
}

err = w.Open(ctx)
if err != nil {
	panic(err)
}

var wg csync.WaitGroup
wg.Add(1)
go func() {
	defer wg.Done()
	err := w.Do(ctx)
	if err != nil {
		panic(err)
	}
}()

// stop node after 150ms, which should be enough to process the 10 messages
time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) })

if err := wg.WaitTimeout(ctx, time.Second); err != nil {
	killAll()
} else {
	logger.Info(ctx).Msg("finished successfully")
}

err = w.Close(ctx)
if err != nil {
	panic(err)
}
Output:

DBG opening source component=task:source connector_id=generator
DBG source open component=task:source connector_id=generator
DBG opening destination component=task:destination connector_id=printer
DBG destination open component=task:destination connector_id=printer
DBG opening destination component=task:destination connector_id=dlq
DBG destination open component=task:destination connector_id=dlq
DBG got record node_id=printer position=generator/0
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/1
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/2
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/3
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/4
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/5
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/6
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/7
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/8
DBG received ack node_id=generator
DBG got record node_id=printer position=generator/9
DBG received ack node_id=generator
INF finished successfully

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch represents a batch of records that are processed together. It keeps track of the status of each record in the batch, and provides methods to update the status of records.

func NewBatch

func NewBatch(records []opencdc.Record) *Batch

func (*Batch) Ack added in v0.13.6

func (b *Batch) Ack(i int, j ...int)

Ack marks the record at index i as acked. If a second index is provided, all records between i (included) and j (excluded) are marked as acked. If multiple indices are provided, the method panics. Records are marked as acked by default, so this method is only useful when reprocessing records marked to be retried.

Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked as acked.

func (*Batch) ActiveRecords

func (b *Batch) ActiveRecords() []opencdc.Record

ActiveRecords returns the records that are not filtered.

func (*Batch) Filter

func (b *Batch) Filter(i int, j ...int)

Filter marks the record at index i as filtered out. If a second index is provided, all records between i (included) and j (excluded) are marked as filtered. If multiple indices are provided, the method panics. Once a record is filtered, it is kept in the batch as to not split the batch into multiple batches. It's not returned in the active records and its status can't be changed anymore, however, it is included when acking the batch.

Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked to be retried.

func (*Batch) HasActiveRecords added in v0.13.6

func (b *Batch) HasActiveRecords() bool

HasActiveRecords returns true if the batch has any records that are not filtered out.

func (*Batch) Nack

func (b *Batch) Nack(i int, errs ...error)

Nack marks the record at index i as nacked. If multiple errors are provided, they are assigned to the records starting at index i.

Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked as nacked.

func (*Batch) Retry

func (b *Batch) Retry(i int, j ...int)

Retry marks the record at index i to be retried. If a second index is provided, all records between i (included) and j (excluded) are marked to be retried. If multiple indices are provided, the method panics.

Note that the indices i and j point to the slice returned by ActiveRecords. If the slice contains filtered records, the filtered records are skipped and not marked to be retried.

func (*Batch) SetRecords

func (b *Batch) SetRecords(i int, recs []opencdc.Record)

SetRecords replaces the records in the batch starting at index i with the provided records. If recs contains n records, indices i to i+n-1 are replaced.

func (*Batch) SplitRecord added in v0.14.0

func (b *Batch) SplitRecord(i int, recs []opencdc.Record)

SplitRecord splits the record at index i into the provided records. The records replace the record at the index i, and the rest of the records are shifted to the right.

type ConnectorMetrics added in v0.13.5

type ConnectorMetrics interface {
	Observe(records []opencdc.Record, start time.Time)
}

func NewDLQMetrics added in v0.13.5

func NewDLQMetrics(pipelineName string, plugin string) ConnectorMetrics

type ConnectorMetricsImpl added in v0.13.5

type ConnectorMetricsImpl struct {
	// contains filtered or unexported fields
}

func NewConnectorMetrics added in v0.13.5

func NewConnectorMetrics(pipelineName, pluginName string, connType connector.Type) ConnectorMetricsImpl

func (ConnectorMetricsImpl) Observe added in v0.13.5

func (m ConnectorMetricsImpl) Observe(records []opencdc.Record, start time.Time)

type DLQ

type DLQ struct {
	// contains filtered or unexported fields
}

func NewDLQ

func NewDLQ(
	id string,
	destination Destination,
	logger log.CtxLogger,
	metrics ConnectorMetrics,

	windowSize int,
	windowNackThreshold int,
) *DLQ

func (*DLQ) Ack

func (d *DLQ) Ack(_ context.Context, batch *Batch)

func (*DLQ) Close

func (d *DLQ) Close(ctx context.Context) error

func (*DLQ) ID

func (d *DLQ) ID() string

func (*DLQ) Nack

func (d *DLQ) Nack(ctx context.Context, batch *Batch, taskID string) (int, error)

func (*DLQ) Open

func (d *DLQ) Open(ctx context.Context) error

type Destination

type Destination interface {
	ID() string
	Open(context.Context) error
	Write(context.Context, []opencdc.Record) error
	Ack(context.Context) ([]connector.DestinationAck, error)
	Teardown(context.Context) error
	// TODO figure out if we want to handle these errors. This returns errors
	//  coming from the persister, which persists the connector asynchronously.
	//  Are we even interested in these errors in the pipeline? Sounds like
	//  something we could surface and handle globally in the runtime instead.
	Errors() <-chan error
}

type DestinationTask

type DestinationTask struct {
	// contains filtered or unexported fields
}

func NewDestinationTask

func NewDestinationTask(
	id string,
	destination Destination,
	logger log.CtxLogger,
	metrics ConnectorMetrics,
) *DestinationTask

func (*DestinationTask) Close

func (t *DestinationTask) Close(ctx context.Context) error

func (*DestinationTask) Do

func (t *DestinationTask) Do(ctx context.Context, batch *Batch) error

func (*DestinationTask) ID

func (t *DestinationTask) ID() string

func (*DestinationTask) Open

func (t *DestinationTask) Open(ctx context.Context) error

type NoOpConnectorMetrics added in v0.13.5

type NoOpConnectorMetrics struct{}

func (NoOpConnectorMetrics) Observe added in v0.13.5

type NoOpProcessorMetrics added in v0.13.5

type NoOpProcessorMetrics struct{}

func (NoOpProcessorMetrics) Observe added in v0.13.5

func (m NoOpProcessorMetrics) Observe(int, time.Time)

type Processor

type Processor interface {
	// Open configures and opens a processor plugin
	Open(ctx context.Context) error
	Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord
	// Teardown tears down a processor plugin.
	// In case of standalone plugins, that means stopping the WASM module.
	Teardown(context.Context) error
}

type ProcessorMetrics added in v0.13.5

type ProcessorMetrics interface {
	Observe(recordsNum int, start time.Time)
}

type ProcessorMetricsImpl added in v0.13.5

type ProcessorMetricsImpl struct {
	// contains filtered or unexported fields
}

func NewProcessorMetrics added in v0.13.5

func NewProcessorMetrics(pipelineName, plugin string) ProcessorMetricsImpl

func (ProcessorMetricsImpl) Observe added in v0.13.5

func (m ProcessorMetricsImpl) Observe(recordsNum int, start time.Time)

type ProcessorTask

type ProcessorTask struct {
	// contains filtered or unexported fields
}

func NewProcessorTask

func NewProcessorTask(
	id string,
	processor Processor,
	logger log.CtxLogger,
	metrics ProcessorMetrics,
) *ProcessorTask

func (*ProcessorTask) Close

func (t *ProcessorTask) Close(ctx context.Context) error

func (*ProcessorTask) Do

func (t *ProcessorTask) Do(ctx context.Context, b *Batch) error

Do processes a batch of records using the processor plugin. It returns an error if the processor fails to process the records, or if the processor returns an invalid number of records.

If the batch contains filtered records, the processor will only process the active records. For instance:

  • Consider a batch with 5 records, 2 of which are filtered. A represents the active records, and F represents the filtered records: [A, A, F, A, F]
  • The processor will only process the active records, so it will receive [A, A, A].
  • The processor will return the processed records, which will be [X, X, X], where X represents the processed records. The records are either processed, filtered or errored.
  • When marking the records in the batch as processed, filtered or errored, the ProcessorTask takes into account the indices of the filtered records, so it marks the correct records in the batch. In the example that we used, the processor will mark the batch as [X, X, F, X, F], leaving the filtered records as is.

func (*ProcessorTask) ID

func (t *ProcessorTask) ID() string

func (*ProcessorTask) Open

func (t *ProcessorTask) Open(ctx context.Context) error

type RecordFlag

type RecordFlag int
const (
	RecordFlagAck    RecordFlag = iota // ack
	RecordFlagNack                     // nack
	RecordFlagRetry                    // retry
	RecordFlagFilter                   // filter
)

func (RecordFlag) String

func (i RecordFlag) String() string

type RecordStatus

type RecordStatus struct {
	Flag  RecordFlag
	Error error
}

RecordStatus holds the status of a record in a batch. The flag indicates the status of the record, and the error is set if the record was nacked.

type Source

type Source interface {
	ID() string
	Open(context.Context) error
	Read(context.Context) ([]opencdc.Record, error)
	Ack(context.Context, []opencdc.Position) error
	Teardown(context.Context) error
	// TODO figure out if we want to handle these errors. This returns errors
	//  coming from the persister, which persists the connector asynchronously.
	//  Are we even interested in these errors in the pipeline? Sounds like
	//  something we could surface and handle globally in the runtime instead.
	Errors() <-chan error
}

type SourceTask

type SourceTask struct {
	// contains filtered or unexported fields
}

func NewSourceTask

func NewSourceTask(
	id string,
	source Source,
	logger log.CtxLogger,
	metrics ConnectorMetrics,
) *SourceTask

func (*SourceTask) Close

func (t *SourceTask) Close(context.Context) error

func (*SourceTask) Do

func (t *SourceTask) Do(ctx context.Context, b *Batch) error

func (*SourceTask) GetSource

func (t *SourceTask) GetSource() Source

func (*SourceTask) ID

func (t *SourceTask) ID() string

func (*SourceTask) Open

func (t *SourceTask) Open(ctx context.Context) error

type Task

type Task interface {
	// ID returns the identifier of this Task. Each Task in a pipeline must be
	// uniquely identified by the ID.
	ID() string

	// Open opens the Task for processing. It is called once before the worker
	// starts processing records.
	Open(context.Context) error
	// Close closes the Task. It is called once after the worker has stopped
	// processing records.
	Close(context.Context) error
	// Do processes the given batch of records. It is called for each batch of
	// records that the worker processes.
	Do(context.Context, *Batch) error
}

Task is a unit of work that can be executed by a Worker. Each Task in a pipeline is executed sequentially, except for tasks related to different destinations, which can be executed in parallel.

type TaskNode added in v0.13.6

type TaskNode struct {
	Task Task
	Next []*TaskNode
	// contains filtered or unexported fields
}

TaskNode represents a task in the pipeline. It contains the task itself and the next tasks to be executed after it.

func (*TaskNode) AppendToEnd added in v0.13.6

func (t *TaskNode) AppendToEnd(next ...*TaskNode) error

AppendToEnd adds a new task to the end of the pipeline. Note that this doesn't mean that the supplied task will be executed directly after this task. Rather, it means that it will be executed after all tasks in the linked list are executed.

If any task node in the list has more than 1 next task, the function will return an error, as it would be ambiguous where to append the task.

If the task was appended successfully, it returns the created TaskNode.

func (*TaskNode) HasNext added in v0.13.6

func (t *TaskNode) HasNext() bool

HasNext returns true if the task has at least one next task.

func (*TaskNode) IsFirst added in v0.13.6

func (t *TaskNode) IsFirst() bool

IsFirst returns true if this task is the first task in the pipeline.

func (*TaskNode) TaskNodes added in v0.13.6

func (t *TaskNode) TaskNodes() iter.Seq[*TaskNode]

TaskNodes returns an iterator over the task nodes in the pipeline. It iterates the task nodes in the order they are defined in the pipeline, depth-first.

func (*TaskNode) Tasks added in v0.13.6

func (t *TaskNode) Tasks() iter.Seq[Task]

Tasks returns an iterator over the tasks in the pipeline. It iterates the tasks in the order they are defined in the pipeline, depth-first.

type Worker

type Worker struct {
	Source    Source
	FirstTask *TaskNode
	DLQ       *DLQ
	// contains filtered or unexported fields
}

Worker collects the tasks that need to be executed in a pipeline for a specific source. It processes records from the source through the tasks until it is stopped. The worker is responsible for coordinating tasks and acking/nacking records.

Batches are processed in the following way:

  • The first task is always a source task which reads a batch of records from the source. The batch is then passed to the next task.
  • Any task between the source and the destination can process the batch by updating the records or their status (see RecordStatus). If a record in the batch is marked as filtered, the next task will skip processing it and consider it as already processed. If a record is marked as nacked, the record will be sent to the DLQ. If a record is marked as retry, the record will be reprocessed by the same task (relevant if a task processed only part of the batch, experienced an error and skipped the rest).
  • The last task is always a destination task which writes the batch of records to the destination. The batch is then acked.

Note that if a task marks a record in the middle of a batch as nacked, the batch is split into sub-batches. The records that were successfully processed continue to the next task (and ideally to the end of the pipeline), because Conduit provides ordering guarantees. Only once the records before the nacked record are end-to-end processed, will the nacked record be sent to the DLQ. The rest of the records are processed as a sub-batch, and the same rules apply to them.

func NewWorker

func NewWorker(
	firstTask *TaskNode,
	dlq *DLQ,
	logger log.CtxLogger,
	timer metrics.Timer,
) (*Worker, error)

func (*Worker) Ack

func (w *Worker) Ack(ctx context.Context, batch *Batch) error

func (*Worker) Close

func (w *Worker) Close(ctx context.Context) error

func (*Worker) Do

func (w *Worker) Do(ctx context.Context) error

Do processes records from the source until the worker is stopped. It returns no error if the worker is stopped gracefully.

func (*Worker) Nack

func (w *Worker) Nack(ctx context.Context, batch *Batch, taskID string) error

func (*Worker) Open

func (w *Worker) Open(ctx context.Context) (err error)

Open opens the worker for processing. It opens all tasks and the DLQ. If any task fails to open, the worker is not opened and the error is returned. Once a worker is opened, it can start processing records. The worker should be closed using Close after it is no longer needed.

func (*Worker) Stop

func (w *Worker) Stop(ctx context.Context) error

Stop stops the worker from processing more records. It does not stop the current batch from being processed. If a batch is currently being processed, the method will block and trigger the stop after the batch is processed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL