kvstore

package module
v0.0.0-...-375da2e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

KV Store

Kafka-backed key-value store with read-your-own-writes semantics. Uses Kafka as write-ahead log, go-memdb for local reads. Single partition, eventually consistent across replicas, your writes block until visible.

Architecture

                            Kafka Topic (single partition)
                           +-----------------------------+
                           |  [0] [1] [2] [3] [4] ...    |
                           +-----------------------------+
                                 ^              |
                                 |              | consume
                          produce|              v
    +------------------------+   |    +------------------+
    |        Client          |   |    |  Consumer Loop   |
    |                        |---+    |                  |
    |  Put(k,v) ------------>|        |  for record :=   |
    |    |                   |        |    storage.Set() |
    |    | blocks until      |        |    notify()      |
    |    v offset consumed   |        +--------+---------+
    |                        |                 |
    |  Get(k) --+            |                 v
    |           |            |        +------------------+
    +-----------+------------+        |  Local Storage   |
                |                     |    (go-memdb)    |
                +-------------------->|                  |
                      direct read     +------------------+

Write path: Put() produces to Kafka, blocks until consumer processes that offset.

Read path: Get() reads directly from local memdb. No network round-trip.

Consistency: Read-your-own-writes guaranteed. Cross-client eventually consistent.

Why

You need a simple KV store but don't want to run another database. You already have Kafka. This uses a single-partition topic as the source of truth with local in-memory reads.

Each client produces to Kafka and consumes into local memory. Writes block until your consumer catches up, so you read what you write. Other clients eventually see it when their consumers catch up.

Not linearizable. Not for strong consistency.

Basic Usage

storage, _ := memdb.New()
client, _ := kv.NewClient(ctx, "my-topic", storage,
    kv.WithBrokers("localhost:9092"),
)
defer client.Close()

// Writes block until visible in reads
client.Put(ctx, []byte("key"), []byte("value"))

// Read immediately works - already visible
value, _ := client.Get(ctx, []byte("key"))

// Delete also blocks
client.Delete(ctx, []byte("key"))

Typed Client with JSON

type User struct {
    Email string `json:"email"`
    Name  string `json:"name"`
}

storage, _ := memdb.New()
client, _ := kv.NewClient(ctx, "users", storage,
    kv.WithBrokers("localhost:9092"),
)

users := kv.NewResourceClient[User](client, kv.JSON[User]())

// Put/Get with structs
users.Put(ctx, []byte("user:1"), User{Email: "[email protected]", Name: "Alice"})
user, _ := users.Get(ctx, []byte("user:1"))

// Range queries by prefix
for item, err := range users.Range(ctx, kv.Prefix("user:"), kv.QueryOptions{Limit: 100}) {
    if err != nil {
        break
    }
    fmt.Printf("%s: %s\n", item.Key, item.Value.Name)
}

Typed Client with Protobuf

import pb "example.com/proto/users"

storage, _ := memdb.New()
client, _ := kv.NewClient(ctx, "accounts", storage,
    kv.WithBrokers("localhost:9092"),
)

serde, _ := kv.Proto(func() *pb.Account {
    return &pb.Account{}
})
accounts := kv.NewResourceClient[*pb.Account](client, serde)

accounts.Put(ctx, []byte("acct:1"), &pb.Account{Name: "Acme Corp"})
account, _ := accounts.Get(ctx, []byte("acct:1"))

Batch Operations

Batch writes sync once on the highest offset. Much faster than individual Puts.

// Individual puts: ~15ms each
client.Put(ctx, []byte("key1"), value1)
client.Put(ctx, []byte("key2"), value2)

// Batch: ~0.16ms per op for 100 items
client.Batch().
    Put([]byte("key1"), value1).
    Put([]byte("key2"), value2).
    Put([]byte("key3"), value3).
    Delete([]byte("old-key")).
    Execute(ctx)

Range Queries

Prefix scans or arbitrary ranges.

// Prefix scan
for kv, err := range client.Range(ctx, kv.Prefix("user:"), kv.QueryOptions{Limit: 100}) {
    // Returns user:1, user:2, etc
}

// Arbitrary range (start inclusive, end exclusive)
constraint := kv.KeyConstraint{Start: []byte("a"), End: []byte("z")}
for kv, err := range client.Range(ctx, constraint, kv.QueryOptions{}) {
    // All keys between a and z
}

Constraints

  • Single partition only (enforced at topic creation)
  • Entire dataset must fit in memory
  • Write throughput limited by single partition (~10K ops/sec)
  • No disk persistence - rebuilt from Kafka on restart
  • No unique constraints enforced
  • No automatic eviction or TTL

Testing

# Full tests (needs Docker for testcontainers)
go test -v ./...

# Unit tests only
go test -short -v ./...

# With race detector
go test -race -v ./...

Configuration

One topic per resource type. Create separate clients for different types.

// Users
usersStorage, _ := memdb.New()
users, _ := kv.NewClient(ctx, "users-topic", usersStorage,
    kv.WithBrokers("localhost:9092"),
    kv.WithReplicationFactor(3), // optional
)

// Products
productsStorage, _ := memdb.New()
products, _ := kv.NewClient(ctx, "products-topic", productsStorage,
    kv.WithBrokers("localhost:9092"),
)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClientClosed = errors.New("client is closed")

ErrClientClosed is returned when operations are performed on a closed client.

View Source
var ErrNotFound = errors.New("key not found")

ErrNotFound is returned when a key does not exist.

Functions

func WrapOnDelete

func WrapOnDelete[T any](fn func(key []byte)) func(key []byte)

WrapOnDelete creates a typed OnDelete callback. This is a convenience wrapper for symmetry with WrapOnSet.

func WrapOnSet

func WrapOnSet[T any](serde Serde[T], fn func(key []byte, value T)) func(key, value []byte)

WrapOnSet creates a raw OnSet callback that deserializes values before calling the typed callback. Deserialization errors are silently ignored since hooks fire after successful storage operations, and the same serde is used for storage - a deserialization failure here indicates a bug.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch accumulates Put and Delete operations for batch execution.

func (*Batch) Delete

func (b *Batch) Delete(key []byte) *Batch

Delete adds a delete operation to the batch.

func (*Batch) Execute

func (b *Batch) Execute(ctx context.Context) error

Execute produces all operations to Kafka and waits for the highest offset to be visible in this client's reads. Returns the first error encountered.

func (*Batch) Put

func (b *Batch) Put(key []byte, value []byte) *Batch

Put adds a put operation to the batch.

type BatchStorage

type BatchStorage interface {
	Storage
	// ApplyBatch applies a batch of set and delete operations atomically.
	// For each item: nil Value means delete, non-nil means set.
	ApplyBatch(ctx context.Context, items []KeyValue) error
}

BatchStorage is an optional interface for storage backends that support batch operations. Implementations can provide this for better performance during bulk inserts (e.g., bootstrap/restore).

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a KV store backed by Kafka. Writes go through Kafka and block until visible in reads. Each topic should contain a single resource type.

func NewClient

func NewClient(ctx context.Context, topic string, storage Storage, opts ...ClientOption) (*Client, error)

NewClient creates a new KV store client. Each topic should contain a single resource type with its own indexes.

The provided context is used only for initialization (topic creation, metadata fetches, and bootstrap sync). It is not retained after NewClient returns - the client manages its own internal context for the consumer goroutine, which is cancelled when Close is called.

The caller MUST call Close() to release resources when done with the client. Cancelling the provided context does not shut down the client.

NewClient blocks until the consumer has caught up to the current high watermark, ensuring reads are consistent immediately after the client is created. For topics with large amounts of data, this may take some time.

func (*Client) Batch

func (c *Client) Batch() *Batch

Batch creates a new batch for executing multiple operations atomically. The batch syncs only once after all operations complete, waiting for the highest offset.

func (*Client) Close

func (c *Client) Close() error

Close shuts down the client.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, key []byte) error

Delete removes a key. Blocks until the delete is visible in this client's reads.

func (*Client) Get

func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error)

Get retrieves a value by key from local storage. Returns ErrNotFound if the key does not exist. May return stale data if this client's consumer is lagging or if other clients wrote recently.

func (*Client) Put

func (c *Client) Put(ctx context.Context, key []byte, value []byte) error

Put stores a key-value pair. Blocks until the write is visible in this client's reads.

func (*Client) Range

func (c *Client) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[KeyValue, error]

Range iterates over keys in the given range from local storage. Use Prefix() to create a KeyConstraint for prefix queries. May return stale data if this client's consumer is lagging or if other clients wrote recently.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption configures a Client.

func WithBrokers

func WithBrokers(brokers ...string) ClientOption

WithBrokers sets the Kafka broker addresses.

func WithKafkaOptions

func WithKafkaOptions(opts ...kgo.Opt) ClientOption

WithKafkaOptions passes additional options to the franz-go client.

func WithLogger

func WithLogger(logger *slog.Logger) ClientOption

WithLogger sets the logger for client operations. If not provided, defaults to slog.Default().

func WithOnDeleteHook

func WithOnDeleteHook(fn func(key []byte)) ClientOption

WithOnDeleteHook registers a callback that fires after each successful Delete operation in the consumer. This includes both bootstrap replay and live updates. The callback receives the key bytes.

func WithOnSetHook

func WithOnSetHook(fn func(key, value []byte)) ClientOption

WithOnSetHook registers a callback that fires after each successful Set operation in the consumer. This includes both bootstrap replay and live updates. The callback receives the key and value bytes.

func WithReplicationFactor

func WithReplicationFactor(rf int16) ClientOption

WithReplicationFactor sets the topic replication factor.

type JSONSerde

type JSONSerde[T any] struct{}

JSONSerde serializes values as JSON.

func (*JSONSerde[T]) Deserialize

func (*JSONSerde[T]) Deserialize(b []byte) (T, error)

Deserialize unmarshals JSON bytes to the value.

func (*JSONSerde[T]) Serialize

func (*JSONSerde[T]) Serialize(v T) ([]byte, error)

Serialize marshals the value to JSON.

type KeyConstraint

type KeyConstraint struct {
	Start []byte // inclusive, nil = beginning
	End   []byte // exclusive, nil = end
}

KeyConstraint defines bounds for a key range scan.

func Prefix

func Prefix(p string) KeyConstraint

Prefix creates a KeyConstraint that matches all keys with the given prefix.

type KeyValue

type KeyValue struct {
	Key   []byte
	Value []byte
}

KeyValue represents a key-value pair.

type ProtoOption

type ProtoOption func(*protoConfig)

ProtoOption configures a ProtoSerde.

func WithSchemaRegistry

func WithSchemaRegistry(
	srClient *sr.Client,
	subject string,
	schemaContent string,
	opts ...SchemaRegistryOption,
) ProtoOption

WithSchemaRegistry configures Schema Registry support for ProtoSerde. This wraps protobuf payloads with Confluent wire format for schema governance. Schema is registered immediately at initialization time.

Parameters:

  • srClient: Schema Registry client
  • subject: Subject name (typically "{topic}-value")
  • schemaContent: Proto file content as string
  • opts: Optional references for proto imports

Example:

serde, err := kvstore.Proto(
    func() *pb.MyMessage { return &pb.MyMessage{} },
    kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema),
)

type ProtoSerde

type ProtoSerde[T proto.Message] struct {
	// contains filtered or unexported fields
}

ProtoSerde serializes values as protobuf, optionally with Schema Registry wire format.

func (*ProtoSerde[T]) Deserialize

func (s *ProtoSerde[T]) Deserialize(b []byte) (T, error)

Deserialize unmarshals protobuf bytes to the message. If Schema Registry is configured, decodes Confluent wire format first.

On an unknown schema ID (e.g. after a schema evolution), the ID is validated against Schema Registry via SchemaByID, then registered locally for subsequent decodes. The actual deserialization always uses proto.Unmarshal with the compiled Go type -- unlike Avro, protobuf's wire format is self-describing (field numbers + wire types), so the schema from SR is not needed at decode time. SR's role for protobuf is governance (compatibility checks at registration) and validation (confirming the ID is legitimate), not driving deserialization.

func (*ProtoSerde[T]) Serialize

func (s *ProtoSerde[T]) Serialize(v T) ([]byte, error)

Serialize marshals the protobuf message. If Schema Registry is configured, wraps with Confluent wire format.

type QueryOptions

type QueryOptions struct {
	Limit int
}

QueryOptions configures queries.

type ResourceClient

type ResourceClient[T any] struct {
	// contains filtered or unexported fields
}

ResourceClient wraps Client with typed operations using a Serde.

func NewResourceClient

func NewResourceClient[T any](client *Client, serde Serde[T]) *ResourceClient[T]

NewResourceClient creates a typed client wrapping the given bytes client.

func (*ResourceClient[T]) Batch

func (r *ResourceClient[T]) Batch() *TypedBatch[T]

Batch creates a new typed batch for bulk operations.

func (*ResourceClient[T]) Delete

func (r *ResourceClient[T]) Delete(ctx context.Context, key []byte) error

Delete removes a key. Blocks until the delete is visible in this client's reads.

func (*ResourceClient[T]) Get

func (r *ResourceClient[T]) Get(ctx context.Context, key []byte) (T, error)

Get retrieves a value by key from local storage. Returns ErrNotFound if the key does not exist. May return stale data if this client's consumer is lagging or if other clients wrote recently.

func (*ResourceClient[T]) Put

func (r *ResourceClient[T]) Put(ctx context.Context, key []byte, value T) error

Put stores a value. Blocks until the write is visible in this client's reads.

func (*ResourceClient[T]) Range

func (r *ResourceClient[T]) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[TypedKV[T], error]

Range iterates over keys in the given range from local storage. Use Prefix() for prefix queries. May return stale data if this client's consumer is lagging or if other clients wrote recently. Stops iteration on first storage or deserialization error. Corrupt data indicates a real problem that must be fixed, not worked around.

func (*ResourceClient[T]) Raw

func (r *ResourceClient[T]) Raw() *Client

Raw returns the underlying untyped Client.

type SchemaRegistryOption

type SchemaRegistryOption func(*srConfig)

SchemaRegistryOption configures Schema Registry behavior.

func WithMessageName

func WithMessageName(name string) SchemaRegistryOption

WithMessageName specifies which message in the proto file is the serialized payload. The message index is resolved by scanning the proto content for top-level message declarations. This avoids hardcoding indices that break when messages are reordered.

If not specified, the first message (index 0) is used.

Example:

kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema,
    kvstore.WithMessageName("LLMProvider"),
)

func WithSchemaReferences

func WithSchemaReferences(refs []sr.SchemaReference) SchemaRegistryOption

WithSchemaReferences specifies proto import dependencies.

Example:

WithSchemaReferences([]sr.SchemaReference{
    {Name: "common.proto", Subject: "topic-common", Version: 1},
})

type Serde

type Serde[T any] interface {
	Serialize(T) ([]byte, error)
	Deserialize([]byte) (T, error)
}

Serde handles serialization and deserialization of values.

func JSON

func JSON[T any]() Serde[T]

JSON returns a JSON serde for type T.

func NewSchemaRegistrySerde deprecated

func NewSchemaRegistrySerde[T proto.Message](
	srClient *sr.Client,
	subject string,
	schemaContent string,
	factory func() T,
	opts ...SchemaRegistryOption,
) (Serde[T], error)

NewSchemaRegistrySerde creates a protobuf serde with Schema Registry support.

Deprecated: Use Proto with WithSchemaRegistry instead.

func Proto

func Proto[T proto.Message](factory func() T, opts ...ProtoOption) (Serde[T], error)

Proto returns a protobuf serde for type T. The factory function must return a new instance of the proto message.

Example basic usage:

serde, err := kvstore.Proto(func() *pb.MyMessage { return &pb.MyMessage{} })

Example with Schema Registry:

serde, err := kvstore.Proto(
    func() *pb.MyMessage { return &pb.MyMessage{} },
    kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema),
)

type Storage

type Storage interface {
	Get(ctx context.Context, key []byte) ([]byte, error)
	Set(ctx context.Context, key []byte, value []byte) error
	Delete(ctx context.Context, key []byte) error
	Range(ctx context.Context, c KeyConstraint, opts QueryOptions) iter.Seq2[KeyValue, error]
}

Storage is the interface for key-value storage backends. All methods accept a context to support cancellation and timeouts, particularly important for remote storage implementations.

type TypedBatch

type TypedBatch[T any] struct {
	// contains filtered or unexported fields
}

TypedBatch accumulates typed Put and Delete operations.

func (*TypedBatch[T]) Delete

func (b *TypedBatch[T]) Delete(key []byte) *TypedBatch[T]

Delete adds a delete operation to the batch.

func (*TypedBatch[T]) Execute

func (b *TypedBatch[T]) Execute(ctx context.Context) error

Execute produces all operations and waits for visibility in this client's reads.

func (*TypedBatch[T]) Put

func (b *TypedBatch[T]) Put(key []byte, value T) *TypedBatch[T]

Put adds a put operation to the batch.

type TypedKV

type TypedKV[T any] struct {
	Key   []byte
	Value T
}

TypedKV is a typed key-value pair.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL