liq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 18 Imported by: 0

README

liq is a Go library that implements a generic, buffered, batched, and persistent queue.

Most of the queue's behavior is configurable:

  • Type of an item
  • How a batch of items is processed
  • How a batch of items is stored in memory (including aggregation)
  • When a batch of items is ready to be stored in a SQLite database
  • How a batch of items is encoded to bytes and vice versa
  • How many workers can process batches concurrently
  • How many batches can be retrieved for processing by a single worker
  • What retry policy do the workers follow when dealing with processing failures
  • How a SQLite database is opened:
    • In memory (no persistence)
    • In a file (with different levels of durability)
  • Optional Prometheus metrics

Installation

go get github.com/teenjuna/liq@latest

Examples

Sometimes it's easier to understand a thing by looking at some examples.

Let's imagine that we're implementing a system that receives many items that look like this:

type Item struct {
    ID    int
    Count int
}

The goal of the system is to apply Count increments for each unique ID. For the sake of simplicity, instead of actually applying the increments in, for example, a database, we'll just output them.

Simple

Let's start from a simple and minimal example. Here we create a liq.Queue[Item] without any additional configuration. By default, the queue is managed fully in memory, meaning there's no persistence. It flushes its buffer after each push, meaning that each batch that will be passed to provided process function contains only one element. Each batch is internally encoded and decoded using the built-in JSON codec, which is one of the several built-in codec implementations. If process function were to fail, it will be infinitely retried with the default exponential backoff retry policy.

import (
	"context"
	"fmt"
	"iter"
	"os/signal"
	"slices"
	"syscall"

	"github.com/teenjuna/liq"
)

func main() {
    queue, err := liq.New(
        func(ctx context.Context, queue *liq.Queue[Item], batch iter.Seq[Item]) error {
            items := slices.Collect(batch)
            fmt.Println("Batch:", items)
            return nil
        },
    )
    if err != nil { /* ... */ }
    defer queue.Close()

    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    _ = queue.Push(ctx, Item{ID: 1, Count: 1})
    _ = queue.Push(ctx, Item{ID: 2, Count: 1})
    _ = queue.Push(ctx, Item{ID: 1, Count: 1})
    _ = queue.Push(ctx, Item{ID: 3, Count: 1})

    <-ctx.Done()
}

Here is the output of the program:

Batch: [{1 1}]
Batch: [{2 1}]
Batch: [{1 1}]
Batch: [{3 1}]
^C⏎ 

Such a simple configuration is mostly useful for tests. Let's look at a more involved example next.

Complex

This example is more interesting:

  • The queue is now managed in file items.liq. Since the file is marked as .Durable(true), the SQLite will flush the data to disk after each transaction.
  • Instead of flushing data to SQLite storage after each push, we first buffer it in an in-memory buffer. Instead of using the default buffer.Appending, which just appends each pushed item to the end, we use buffer.Merging, which can merge items by their ID on each push. The buffer will flush after any of the following conditions becomes true:
    • Buffer contains 100 elements
    • Buffer received 1000 pushes (different from the "contains" because of buffer.Merging behaviour)
    • Buffer contains at least one item and it hasn't flushed in a 1 second interval
  • Instead of using slow-ish JSON codec, it uses another built-in implementation that utilizes encoding/gob package
  • Flushed batches are processed by two concurrent workers instead of one
  • Each worker tries to process up to 10 batches at once (internally they're merged into one batch using the buffer.Merging behaviour)
  • Workers use the same exponential backoff retry policy, but with different configuration:
    • Up to 5 attempts (instead of infinite)
    • Every subsequent interval grows by 1.5x instead of 2x
    • Every interval has a random jitter of up to 20% instead of up to 10%
    • If no attempts left, the batch (or, more specifically, all the real batches it consists of) is returned back to the queue with a minute cooldown, meaning that the workers will skip them for a minute
  • The queue sends its (slightly customized) Prometheus metrics with the provided registry
import (
	"context"
	"fmt"
	"iter"
	"os/signal"
	"slices"
	"syscall"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/teenjuna/liq"
	"github.com/teenjuna/liq/buffer"
	"github.com/teenjuna/liq/codec/gob"
	"github.com/teenjuna/liq/retry"
)

func main() {
    registry := prometheus.NewRegistry()

    queue, err := liq.New(
        func(ctx context.Context, queue *liq.Queue[Item], batch iter.Seq[Item]) error {
            items := slices.Collect(batch)
            fmt.Println("Batch:", items)
            return nil
        },
        func(c *liq.Config[Item]) {
            c.File(liq.File("items.liq").Durable(true))
            c.FlushSize(100)
            c.FlushPushes(1000)
            c.FlushTimeout(time.Second)
            c.Buffer(buffer.Merging(
                func(i Item) int {
                    return i.ID
                },
                func(i1, i2 Item) Item {
                    return Item{ID: i1.ID, Count: i1.Count + i2.Count}
                },
            ))
            c.Codec(gob.New[Item]())
            c.Workers(2)
            c.Batches(10)
            c.RetryPolicy(retry.
                Exponential(5, time.Second, time.Minute).
                WithBase(1.5).
                WithJitter(0.2).
                WithCooldown(time.Minute),
            )
            c.Prometheus(liq.Prometheus(registry, func(c *liq.PrometheusConfig) {
                c.ProcessDuration.Buckets = prometheus.ExponentialBuckets(10, 2, 5)
            }))
        },
    )
    if err != nil { /* ... */ }
    defer queue.Close()

    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    _ = queue.Push(ctx, Item{ID: 1, Count: 1})
    _ = queue.Push(ctx, Item{ID: 2, Count: 1})
    _ = queue.Push(ctx, Item{ID: 1, Count: 1})
    _ = queue.Push(ctx, Item{ID: 3, Count: 1})

    <-ctx.Done()
}

And here is the output:

Batch: [{3 1} {1 2} {2 1}]
^C⏎ 

This example is a bit silly because some of the options are overkill for the 4 pushes we do, but let's ignore that.

Architecture

Data flow

TODO: diagram

Concepts

Process function

TODO

Worker

TODO

Buffer

TODO

Codec

TODO

Retry policy

TODO

Metrics

TODO: description of available metrics

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is the error that is returned by Queue methods when it's already closed.
	ErrClosed = errors.New("queue is closed")
)

Functions

This section is empty.

Types

type Config

type Config[Item any] struct {
	// contains filtered or unexported fields
}

Config contains configuration for a Queue.

func (*Config[Item]) Batches

func (c *Config[Item]) Batches(batches int)

Batches sets the upper-limit for number of batches that each processing worker will try to retrieve at once.

Before passing the items into the ProcessFunc, the items will go through the worker's own instance of buffer (configured by Config.Buffer). This means that if the buffer implements some kind of aggregation (like buffer.Merging), this aggregation will be applied to all the items of the retrieved batches before they're passed to the ProcessFunc as a single batch.

Panics if the batches < 1.

func (*Config[Item]) Buffer

func (c *Config[Item]) Buffer(buffer buffer.Buffer[Item])

Buffer sets the in-memory buffer which will be used by the internal workers for storing the items in-memory.

Panics if the buffer is nil.

func (*Config[Item]) Codec

func (c *Config[Item]) Codec(codec codec.Codec[Item])

Codec sets the codec which will be used by the internal workers for turning buffers into bytes and vice versa.

Panics if the codec is nil.

func (*Config[Item]) File

func (c *Config[Item]) File(file *FileConfig)

File configures the file for the SQLite database. If file is nil, SQLite database will be opened in-memory, making the queue not persistent.

Panics if file is blank or contains the `?` symbol.

func (*Config[Item]) FlushPushes

func (c *Config[Item]) FlushPushes(pushes int)

FlushPushes sets the number of pushes to buffer after which it will be automatically flushed.

It can be different from the Config.FlushSize if the buffer performs some kind of aggregation, like buffer.Merging.

Panics if pushes < 0.

func (*Config[Item]) FlushSize

func (c *Config[Item]) FlushSize(size int)

FlushSize sets the size of the buffer after which it will be automatically flushed.

It can be different from the Config.FlushPushes if the buffer performs some kind of aggregation, like buffer.Merging.

Panics if size < 0.

func (*Config[Item]) FlushTimeout

func (c *Config[Item]) FlushTimeout(timeout time.Duration)

FlushTimeout sets the amount of time after which the buffer will be automatically flushed. Zero means no timeout.

Panics if the timeout is < 0.

func (*Config[Item]) InternalErrorHandler

func (c *Config[Item]) InternalErrorHandler(handler func(error))

InternalErrorHandler sets the function that will be called in case of internal error.

If handler is nil, nothing will be called.

func (*Config[Item]) ProcessErrorHandler

func (c *Config[Item]) ProcessErrorHandler(handler func(error))

ProcessErrorHandler sets the function that will be called in case ProcessFunc returns error.

If handler is nil, nothing will be called.

func (*Config[Item]) Prometheus

func (c *Config[Item]) Prometheus(config *PrometheusConfig)

Prometheus sets the PrometheusConfig that will be used to provide queue's metrics.

If config is nil, no metrics will be provided.

func (*Config[Item]) RetryPolicy

func (c *Config[Item]) RetryPolicy(policy retry.Policy)

RetryPolicy sets the retry policy for the ProcessFunc.

Panics if the policy is nil.

func (*Config[Item]) Workers

func (c *Config[Item]) Workers(workers int)

Workers sets the number of processing workers managed by the queue.

Two workers will never retrieve the same batch at the same time.

Panics if the workers < 1.

type ConfigFunc

type ConfigFunc[Item any] = func(c *Config[Item])

ConfigFunc is a function that configures a Queue.

type FileConfig

type FileConfig struct {
	// contains filtered or unexported fields
}

FileConfig is a config of the file used for the SQLite database.

An instance can be created only by the File function. The zero value is invalid.

func File

func File(file string) *FileConfig

File returns a FileConfig with the provided file path.

The file path can be a relative or absolute path to a SQLite database file.

func (*FileConfig) Durable

func (c *FileConfig) Durable(durable bool) *FileConfig

Durable makes each flush of the queue's buffer into the SQLite file durably synchronized to disk, which is disabled by default.

This is achieved by setting SQLite's PRAGMA synchronous to FULL instead of NORMAL. This setting is expected to make flushes slower, so enable it only if you know what you're doing. You can read more about it here:

type ProcessFunc

type ProcessFunc[Item any] = func(ctx context.Context, queue *Queue[Item], batch iter.Seq[Item]) error

ProcessFunc is a function that is used by a Queue to process its data.

If non-nil error is returned, the operation will be retried based on the Config.RetryPolicy. If there are no attempts remaining, the data will be returned back into the queue.

In most cases, the upper limit of items in the batch is equal to Config.FlushSize × Config.Batches. Some batches may be larger if they were flushed by Queue.Flush or Queue.Close.

Provided context will be cancelled if Queue.Close is called.

Provided queue may be used to push elements back into the queue. This will work even when the queue is closing if you pass the provided context into the Queue.Push method.

Important

Don't ever push elements back into the queue if your function can return an error. This will
lead to data duplication.

type PrometheusConfig

type PrometheusConfig struct {
	// Namespace of the metrics.
	Namespace string
	// Subsystem of the metrics.
	Subsystem string
	// Options for the batches gauge.
	Batches prometheus.GaugeOpts
	// Options for the items gauge.
	Items prometheus.GaugeOpts
	// Options for the pushed items counter.
	ItemsPushed prometheus.CounterOpts
	// Options for the flushed items counter.
	ItemsFlushed prometheus.CounterOpts
	// Options for the processed items counter.
	ItemsProcessed prometheus.CounterOpts
	// Options for the process errors counter.
	ProcessErrors prometheus.CounterOpts
	// Options for the process duration histogram.
	ProcessDuration prometheus.HistogramOpts
	// contains filtered or unexported fields
}

PrometheusConfig is a config of the Prometheus metrics provided by the queue.

An instance can be created only by the Prometheus function. The zero value is invalid.

func Prometheus

func Prometheus(
	registerer prometheus.Registerer,
	configFuncs ...func(c *PrometheusConfig),
) *PrometheusConfig

Prometheus returns a PrometheusConfig with the provided registerer. If registerer is nil, metrics will not be registered. Many default parameters can be configured by passing configuration functions.

type Queue

type Queue[Item any] struct {
	// contains filtered or unexported fields
}

Queue is a buffered, batched, and persistent queue for items of type Item.

Each opened queue expects to eventually be closed using the Queue.Close. Otherwise some data may be lost.

Items are pushed into the queue using the Queue.Push. Pushed items are stored in an in-memory buffer (configured by Config.Buffer) until either a size (configured by Config.FlushSize) or a time (configured by Config.FlushTimeout) limit is reached or Queue.Flush is called. When that happens, the buffer is encoded into bytes (configured by Config.Codec) and stored in a SQLite database (configured by Config.File).

Each stored batch, starting from the oldest, is then retrieved by a background worker, which will decode it back into a typed buffer and pass it into the queue's ProcessFunc. If the processing is finished without an error, the batch will be deleted and the worker will retrieve the next batch. Otherwise the worker tries again, according to the queue's retry.Policy (configured by Config.RetryPolicy). If there are no attempts remaining, the batch will stay in the SQLite database with the same priority, though it may receive a cooldown period if retry.Policy configured it.

The background worker may actually retrieve more than one batch at the same time. The upper limit of the number of batches is configured by Config.Batches. When that happens, the items from all batches will go through workers' own instance of a buffer configured by Config.Buffer. This means that if the buffer implements some kind of aggregation (like buffer.Merging), this aggregation will be applied to all the items of the retrieved batches before they're passed to the ProcessFunc as a single batch. If the processing succeeds, all the original batches will be deleted and the worker will retrieve the next set of batches. If the processing fails and there are no attempts remaining, all the original batches will stay in the SQLite database, just like with the one batch.

More than one background worker can process batches. The number of workers is configured by Config.Workers. Two workers will never retrieve the same batch.

func New

func New[Item any](
	processFunc ProcessFunc[Item],
	configFuncs ...ConfigFunc[Item],
) (
	*Queue[Item],
	error,
)

New creates a Queue with the provided ProcessFunc and a default Config, which may be changed with the provided list of ConfigFunc.

Default config:

func (*Queue[Item]) Close

func (q *Queue[Item]) Close() error

Close closes the queue.

It tries to do this gracefully by following these steps:

  1. Prevent new items from being pushed (except the items pushed from the ProcessFunc)
  2. Stop the process workers by cancelling their context, which is passed to the ProcessFunc
  3. Stop the push worker (the one that passes items into the in-memory buffer)
  4. Close the underlying SQLite database

It will return the ErrClosed on the subsequent calls.

func (*Queue[Item]) Flush

func (q *Queue[Item]) Flush(ctx context.Context) error

Flush flushes the queue's in-memory buffer.

It may return an error if the provided context cancels before the flush is complete. It will return the ErrClosed if the queue is closed.

func (*Queue[Item]) Push

func (q *Queue[Item]) Push(ctx context.Context, item Item) error

Push pushes the item into the queue's in-memory buffer.

It may return an error if provided context cancels before the item is pushed. It will also return the ErrClosed if the queue is closed (except in cases when the method is called from the ProcessFunc while the queue is still closing).

Directories

Path Synopsis
This package contains the main Buffer interface and several implementations.
This package contains the main Buffer interface and several implementations.
This package contains the main Codec interface and several implementations inside subpackages.
This package contains the main Codec interface and several implementations inside subpackages.
gob
msgp module
This package contains the main Policy interface and several implementations.
This package contains the main Policy interface and several implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL