Documentation
¶
Index ¶
- Variables
- type Config
- func (c *Config[Item]) Batches(batches int)
- func (c *Config[Item]) Buffer(buffer buffer.Buffer[Item])
- func (c *Config[Item]) Codec(codec codec.Codec[Item])
- func (c *Config[Item]) File(file *FileConfig)
- func (c *Config[Item]) FlushPushes(pushes int)
- func (c *Config[Item]) FlushSize(size int)
- func (c *Config[Item]) FlushTimeout(timeout time.Duration)
- func (c *Config[Item]) InternalErrorHandler(handler func(error))
- func (c *Config[Item]) ProcessErrorHandler(handler func(error))
- func (c *Config[Item]) Prometheus(config *PrometheusConfig)
- func (c *Config[Item]) RetryPolicy(policy retry.Policy)
- func (c *Config[Item]) Workers(workers int)
- type ConfigFunc
- type FileConfig
- type ProcessFunc
- type PrometheusConfig
- type Queue
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed is the error that is returned by Queue methods when it's already closed. ErrClosed = errors.New("queue is closed") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[Item any] struct { // contains filtered or unexported fields }
Config contains configuration for a Queue.
func (*Config[Item]) Batches ¶
Batches sets the upper-limit for number of batches that each processing worker will try to retrieve at once.
Before passing the items into the ProcessFunc, the items will go through the worker's own instance of buffer (configured by Config.Buffer). This means that if the buffer implements some kind of aggregation (like buffer.Merging), this aggregation will be applied to all the items of the retrieved batches before they're passed to the ProcessFunc as a single batch.
Panics if the batches < 1.
func (*Config[Item]) Buffer ¶
Buffer sets the in-memory buffer which will be used by the internal workers for storing the items in-memory.
Panics if the buffer is nil.
func (*Config[Item]) Codec ¶
Codec sets the codec which will be used by the internal workers for turning buffers into bytes and vice versa.
Panics if the codec is nil.
func (*Config[Item]) File ¶
func (c *Config[Item]) File(file *FileConfig)
File configures the file for the SQLite database. If file is nil, SQLite database will be opened in-memory, making the queue not persistent.
Panics if file is blank or contains the `?` symbol.
func (*Config[Item]) FlushPushes ¶
FlushPushes sets the number of pushes to buffer after which it will be automatically flushed.
It can be different from the Config.FlushSize if the buffer performs some kind of aggregation, like buffer.Merging.
Panics if pushes < 0.
func (*Config[Item]) FlushSize ¶
FlushSize sets the size of the buffer after which it will be automatically flushed.
It can be different from the Config.FlushPushes if the buffer performs some kind of aggregation, like buffer.Merging.
Panics if size < 0.
func (*Config[Item]) FlushTimeout ¶
FlushTimeout sets the amount of time after which the buffer will be automatically flushed. Zero means no timeout.
Panics if the timeout is < 0.
func (*Config[Item]) InternalErrorHandler ¶
InternalErrorHandler sets the function that will be called in case of internal error.
If handler is nil, nothing will be called.
func (*Config[Item]) ProcessErrorHandler ¶
ProcessErrorHandler sets the function that will be called in case ProcessFunc returns error.
If handler is nil, nothing will be called.
func (*Config[Item]) Prometheus ¶
func (c *Config[Item]) Prometheus(config *PrometheusConfig)
Prometheus sets the PrometheusConfig that will be used to provide queue's metrics.
If config is nil, no metrics will be provided.
func (*Config[Item]) RetryPolicy ¶
RetryPolicy sets the retry policy for the ProcessFunc.
Panics if the policy is nil.
type ConfigFunc ¶
ConfigFunc is a function that configures a Queue.
type FileConfig ¶
type FileConfig struct {
// contains filtered or unexported fields
}
FileConfig is a config of the file used for the SQLite database.
An instance can be created only by the File function. The zero value is invalid.
func File ¶
func File(file string) *FileConfig
File returns a FileConfig with the provided file path.
The file path can be a relative or absolute path to a SQLite database file.
func (*FileConfig) Durable ¶
func (c *FileConfig) Durable(durable bool) *FileConfig
Durable makes each flush of the queue's buffer into the SQLite file durably synchronized to disk, which is disabled by default.
This is achieved by setting SQLite's PRAGMA synchronous to FULL instead of NORMAL. This setting is expected to make flushes slower, so enable it only if you know what you're doing. You can read more about it here:
type ProcessFunc ¶
type ProcessFunc[Item any] = func(ctx context.Context, queue *Queue[Item], batch iter.Seq[Item]) error
ProcessFunc is a function that is used by a Queue to process its data.
If non-nil error is returned, the operation will be retried based on the Config.RetryPolicy. If there are no attempts remaining, the data will be returned back into the queue.
In most cases, the upper limit of items in the batch is equal to Config.FlushSize × Config.Batches. Some batches may be larger if they were flushed by Queue.Flush or Queue.Close.
Provided context will be cancelled if Queue.Close is called.
Provided queue may be used to push elements back into the queue. This will work even when the queue is closing if you pass the provided context into the Queue.Push method.
Important Don't ever push elements back into the queue if your function can return an error. This will lead to data duplication.
type PrometheusConfig ¶
type PrometheusConfig struct {
// Namespace of the metrics.
Namespace string
// Subsystem of the metrics.
Subsystem string
// Options for the batches gauge.
Batches prometheus.GaugeOpts
// Options for the items gauge.
Items prometheus.GaugeOpts
// Options for the pushed items counter.
ItemsPushed prometheus.CounterOpts
// Options for the flushed items counter.
ItemsFlushed prometheus.CounterOpts
// Options for the processed items counter.
ItemsProcessed prometheus.CounterOpts
// Options for the process errors counter.
ProcessErrors prometheus.CounterOpts
// Options for the process duration histogram.
ProcessDuration prometheus.HistogramOpts
// contains filtered or unexported fields
}
PrometheusConfig is a config of the Prometheus metrics provided by the queue.
An instance can be created only by the Prometheus function. The zero value is invalid.
func Prometheus ¶
func Prometheus( registerer prometheus.Registerer, configFuncs ...func(c *PrometheusConfig), ) *PrometheusConfig
Prometheus returns a PrometheusConfig with the provided registerer. If registerer is nil, metrics will not be registered. Many default parameters can be configured by passing configuration functions.
type Queue ¶
type Queue[Item any] struct { // contains filtered or unexported fields }
Queue is a buffered, batched, and persistent queue for items of type Item.
Each opened queue expects to eventually be closed using the Queue.Close. Otherwise some data may be lost.
Items are pushed into the queue using the Queue.Push. Pushed items are stored in an in-memory buffer (configured by Config.Buffer) until either a size (configured by Config.FlushSize) or a time (configured by Config.FlushTimeout) limit is reached or Queue.Flush is called. When that happens, the buffer is encoded into bytes (configured by Config.Codec) and stored in a SQLite database (configured by Config.File).
Each stored batch, starting from the oldest, is then retrieved by a background worker, which will decode it back into a typed buffer and pass it into the queue's ProcessFunc. If the processing is finished without an error, the batch will be deleted and the worker will retrieve the next batch. Otherwise the worker tries again, according to the queue's retry.Policy (configured by Config.RetryPolicy). If there are no attempts remaining, the batch will stay in the SQLite database with the same priority, though it may receive a cooldown period if retry.Policy configured it.
The background worker may actually retrieve more than one batch at the same time. The upper limit of the number of batches is configured by Config.Batches. When that happens, the items from all batches will go through workers' own instance of a buffer configured by Config.Buffer. This means that if the buffer implements some kind of aggregation (like buffer.Merging), this aggregation will be applied to all the items of the retrieved batches before they're passed to the ProcessFunc as a single batch. If the processing succeeds, all the original batches will be deleted and the worker will retrieve the next set of batches. If the processing fails and there are no attempts remaining, all the original batches will stay in the SQLite database, just like with the one batch.
More than one background worker can process batches. The number of workers is configured by Config.Workers. Two workers will never retrieve the same batch.
func New ¶
func New[Item any]( processFunc ProcessFunc[Item], configFuncs ...ConfigFunc[Item], ) ( *Queue[Item], error, )
New creates a Queue with the provided ProcessFunc and a default Config, which may be changed with the provided list of ConfigFunc.
Default config:
- Config.File is set to nil (meaning the queue is fully in-memory)
- Config.FlushSize is set to 1
- Config.FlushPushes is set to 0
- Config.FlushTimeout is set to 0
- Config.Workers is set to 1
- Config.Batches is set to 1
- Config.Buffer is set to buffer.Appending
- Config.Codec is set to json.Codec
- Config.RetryPolicy is set to retry.Exponential with infinite attempts and interval from 1 second to 1 hour
- Config.Prometheus is set to nil
- Config.InternalErrorHandler is set to log the error using slog.Error (except in tests)
- Config.ProcessErrorHandler is set to log the error using slog.Error (except in tests)
func (*Queue[Item]) Close ¶
Close closes the queue.
It tries to do this gracefully by following these steps:
- Prevent new items from being pushed (except the items pushed from the ProcessFunc)
- Stop the process workers by cancelling their context, which is passed to the ProcessFunc
- Stop the push worker (the one that passes items into the in-memory buffer)
- Close the underlying SQLite database
It will return the ErrClosed on the subsequent calls.
func (*Queue[Item]) Flush ¶
Flush flushes the queue's in-memory buffer.
It may return an error if the provided context cancels before the flush is complete. It will return the ErrClosed if the queue is closed.
func (*Queue[Item]) Push ¶
Push pushes the item into the queue's in-memory buffer.
It may return an error if provided context cancels before the item is pushed. It will also return the ErrClosed if the queue is closed (except in cases when the method is called from the ProcessFunc while the queue is still closing).
Directories
¶
| Path | Synopsis |
|---|---|
|
This package contains the main Buffer interface and several implementations.
|
This package contains the main Buffer interface and several implementations. |
|
This package contains the main Codec interface and several implementations inside subpackages.
|
This package contains the main Codec interface and several implementations inside subpackages. |
|
msgp
module
|
|
|
This package contains the main Policy interface and several implementations.
|
This package contains the main Policy interface and several implementations. |