source

package
v1.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSkip = errors.New("skip source")

ErrSkip is used to indicate that a particular source should not be scheduled.

Functions

func Consume

func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout)

Consume continuously reads log entries from recv and forwards them to the fanout f. It runs until ctx is cancelled or an error occurs while sending to the fanout.

This function is typically used in component Run methods to handle the forwarding of log entries from a component's internal handler to downstream receivers. The fanout allows entries to be sent to multiple receivers concurrently.

func ConsumeBatch

func ConsumeBatch(ctx context.Context, recv loki.LogsBatchReceiver, f *loki.Fanout)

ConsumeBatch continuously reads batches of log entries from recv and forwards them to the fanout f. It runs until ctx is cancelled or an error occurs while sending to the fanout.

This function is typically used in component Run methods to handle the forwarding of log entries from a component's internal handler to downstream receivers. The fanout allows entries to be sent to multiple receivers concurrently.

func Drain

func Drain(recv loki.LogsReceiver, f func())

Drain consumes log entries from recv in a background goroutine while f executes. This prevents deadlocks that can occur when stopping components that may still be sending entries to the receiver channel. The draining goroutine will continue consuming entries until f returns, at which point the context is cancelled and the goroutine exits.

This is typically used during component shutdown to drain any remaining entries from a receiver channel while performing cleanup operations.

func Reconcile

func Reconcile[Key comparable, Input any](
	logger log.Logger,
	s *Scheduler[Key],
	it iter.Seq[Input],
	keyFn KeyFn[Key, Input],
	sourceFactoryFn SourceFactoryFn[Key, Input],
)

Reconcile synchronizes the scheduler's set of running sources with a desired state. It iterates over inputs, creates sources for new items, and stops sources that are no longer needed.

Types

type DebugSource

type DebugSource interface {
	DebugInfo() any
}

DebugSource is an optional interface with debug information.

type KeyFn

type KeyFn[Key comparable, Input any] func(Input) Key

KeyFn extracts a comparable key of type Key from an input value of type Input. The key is used to uniquely identify sources in the scheduler.

type Scheduler

type Scheduler[Key comparable] struct {
	// contains filtered or unexported fields
}

Scheduler manages the lifecycle of sources. It is not safe for concurrent use: callers must ensure proper synchronization when accessing or modifying Scheduler and its sources from multiple goroutines.

func NewScheduler

func NewScheduler[Key comparable]() *Scheduler[Key]

func (*Scheduler[Key]) Contains

func (s *Scheduler[Key]) Contains(k Key) bool

Contains returns true if a source with provided k already exists.

func (*Scheduler[Key]) Len

func (s *Scheduler[Key]) Len() int

Len returns number of scheduled sources

func (*Scheduler[Key]) Reset

func (s *Scheduler[Key]) Reset()

Reset will stop all running sources and wait for them to finish and reset Scheduler to a usable state.

func (*Scheduler[Key]) ScheduleSource

func (s *Scheduler[Key]) ScheduleSource(source Source[Key])

ScheduleSource will register and run the provided source in a goroutine. If a source with the same key already exists it will do nothing.

func (*Scheduler[Key]) Sources

func (s *Scheduler[Key]) Sources() iter.Seq[Source[Key]]

Sources returns an iterator of all scheduled sources.

func (*Scheduler[Key]) Stop

func (s *Scheduler[Key]) Stop()

Stop will stop all running sources and wait for them to finish. Scheduler should not be reused after Stop is called.

func (*Scheduler[Key]) StopSource

func (s *Scheduler[Key]) StopSource(source Source[Key])

StopSource will unregister provided source and cancel it without waiting for it to stop.

type Source

type Source[Key comparable] interface {
	// Run should start the source.
	// It should run until there is no more work or context is canceled.
	Run(ctx context.Context)
	// Key is used to uniquely identify the source.
	Key() Key
}

type SourceFactoryFn

type SourceFactoryFn[Key comparable, Input any] func(Key, Input) (Source[Key], error)

SourceFactoryFn creates a Source[Key] from a key and input value. It returns the created source (or nil if creation failed or should be skipped) and an error. Return ErrSkip to indicate that the source should not be scheduled without logging an error.

type SourceWithRetry

type SourceWithRetry[Key comparable] struct {
	// contains filtered or unexported fields
}

SourceWithRetry is used to wrap another source and apply retries when running.

func NewSourceWithRetry

func NewSourceWithRetry[Key comparable](source Source[Key], config backoff.Config) *SourceWithRetry[Key]

func (*SourceWithRetry[Key]) DebugInfo

func (s *SourceWithRetry[Key]) DebugInfo() any

func (*SourceWithRetry[Key]) Key

func (s *SourceWithRetry[Key]) Key() Key

func (*SourceWithRetry[Key]) Run

func (s *SourceWithRetry[Key]) Run(ctx context.Context)

Directories

Path Synopsis
api
internal
Package kubernetes implements the loki.source.kubernetes component.
Package kubernetes implements the loki.source.kubernetes component.
kubetail
Package kubetail implements a log file tailer using the Kubernetes API.
Package kubetail implements a log file tailer using the Kubernetes API.
Package kubernetes_events implements the loki.source.kubernetes_events component.
Package kubernetes_events implements the loki.source.kubernetes_events component.
win_eventlog
Package win_eventlog Input plugin to collect Windows Event Log messages
Package win_eventlog Input plugin to collect Windows Event Log messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL