Documentation
¶
Index ¶
- Variables
- func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout)
- func ConsumeBatch(ctx context.Context, recv loki.LogsBatchReceiver, f *loki.Fanout)
- func Drain(recv loki.LogsReceiver, f func())
- func Reconcile[Key comparable, Input any](logger log.Logger, s *Scheduler[Key], it iter.Seq[Input], ...)
- type DebugSource
- type KeyFn
- type Scheduler
- func (s *Scheduler[Key]) Contains(k Key) bool
- func (s *Scheduler[Key]) Len() int
- func (s *Scheduler[Key]) Reset()
- func (s *Scheduler[Key]) ScheduleSource(source Source[Key])
- func (s *Scheduler[Key]) Sources() iter.Seq[Source[Key]]
- func (s *Scheduler[Key]) Stop()
- func (s *Scheduler[Key]) StopSource(source Source[Key])
- type Source
- type SourceFactoryFn
- type SourceWithRetry
Constants ¶
This section is empty.
Variables ¶
var ErrSkip = errors.New("skip source")
ErrSkip is used to indicate that a particular source should not be scheduled.
Functions ¶
func Consume ¶
Consume continuously reads log entries from recv and forwards them to the fanout f. It runs until ctx is cancelled or an error occurs while sending to the fanout.
This function is typically used in component Run methods to handle the forwarding of log entries from a component's internal handler to downstream receivers. The fanout allows entries to be sent to multiple receivers concurrently.
func ConsumeBatch ¶
ConsumeBatch continuously reads batches of log entries from recv and forwards them to the fanout f. It runs until ctx is cancelled or an error occurs while sending to the fanout.
This function is typically used in component Run methods to handle the forwarding of log entries from a component's internal handler to downstream receivers. The fanout allows entries to be sent to multiple receivers concurrently.
func Drain ¶
func Drain(recv loki.LogsReceiver, f func())
Drain consumes log entries from recv in a background goroutine while f executes. This prevents deadlocks that can occur when stopping components that may still be sending entries to the receiver channel. The draining goroutine will continue consuming entries until f returns, at which point the context is cancelled and the goroutine exits.
This is typically used during component shutdown to drain any remaining entries from a receiver channel while performing cleanup operations.
func Reconcile ¶
func Reconcile[Key comparable, Input any]( logger log.Logger, s *Scheduler[Key], it iter.Seq[Input], keyFn KeyFn[Key, Input], sourceFactoryFn SourceFactoryFn[Key, Input], )
Reconcile synchronizes the scheduler's set of running sources with a desired state. It iterates over inputs, creates sources for new items, and stops sources that are no longer needed.
Types ¶
type DebugSource ¶
type DebugSource interface {
DebugInfo() any
}
DebugSource is an optional interface with debug information.
type KeyFn ¶
type KeyFn[Key comparable, Input any] func(Input) Key
KeyFn extracts a comparable key of type Key from an input value of type Input. The key is used to uniquely identify sources in the scheduler.
type Scheduler ¶
type Scheduler[Key comparable] struct { // contains filtered or unexported fields }
Scheduler manages the lifecycle of sources. It is not safe for concurrent use: callers must ensure proper synchronization when accessing or modifying Scheduler and its sources from multiple goroutines.
func NewScheduler ¶
func NewScheduler[Key comparable]() *Scheduler[Key]
func (*Scheduler[Key]) Reset ¶
func (s *Scheduler[Key]) Reset()
Reset will stop all running sources and wait for them to finish and reset Scheduler to a usable state.
func (*Scheduler[Key]) ScheduleSource ¶
ScheduleSource will register and run the provided source in a goroutine. If a source with the same key already exists it will do nothing.
func (*Scheduler[Key]) Stop ¶
func (s *Scheduler[Key]) Stop()
Stop will stop all running sources and wait for them to finish. Scheduler should not be reused after Stop is called.
func (*Scheduler[Key]) StopSource ¶
StopSource will unregister provided source and cancel it without waiting for it to stop.
type Source ¶
type Source[Key comparable] interface { // Run should start the source. // It should run until there is no more work or context is canceled. Run(ctx context.Context) // Key is used to uniquely identify the source. Key() Key }
type SourceFactoryFn ¶
type SourceFactoryFn[Key comparable, Input any] func(Key, Input) (Source[Key], error)
SourceFactoryFn creates a Source[Key] from a key and input value. It returns the created source (or nil if creation failed or should be skipped) and an error. Return ErrSkip to indicate that the source should not be scheduled without logging an error.
type SourceWithRetry ¶
type SourceWithRetry[Key comparable] struct { // contains filtered or unexported fields }
SourceWithRetry is used to wrap another source and apply retries when running.
func NewSourceWithRetry ¶
func NewSourceWithRetry[Key comparable](source Source[Key], config backoff.Config) *SourceWithRetry[Key]
func (*SourceWithRetry[Key]) DebugInfo ¶
func (s *SourceWithRetry[Key]) DebugInfo() any
func (*SourceWithRetry[Key]) Key ¶
func (s *SourceWithRetry[Key]) Key() Key
func (*SourceWithRetry[Key]) Run ¶
func (s *SourceWithRetry[Key]) Run(ctx context.Context)
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
Package kubernetes implements the loki.source.kubernetes component.
|
Package kubernetes implements the loki.source.kubernetes component. |
|
kubetail
Package kubetail implements a log file tailer using the Kubernetes API.
|
Package kubetail implements a log file tailer using the Kubernetes API. |
|
Package kubernetes_events implements the loki.source.kubernetes_events component.
|
Package kubernetes_events implements the loki.source.kubernetes_events component. |
|
win_eventlog
Package win_eventlog Input plugin to collect Windows Event Log messages
|
Package win_eventlog Input plugin to collect Windows Event Log messages |