Documentation
¶
Index ¶
- Variables
- func WrapOnDelete[T any](fn func(key []byte)) func(key []byte)
- func WrapOnSet[T any](serde Serde[T], fn func(key []byte, value T)) func(key, value []byte)
- type Batch
- type BatchStorage
- type Client
- func (c *Client) Batch() *Batch
- func (c *Client) Close() error
- func (c *Client) Delete(ctx context.Context, key []byte) error
- func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error)
- func (c *Client) Put(ctx context.Context, key []byte, value []byte) error
- func (c *Client) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[KeyValue, error]
- type ClientOption
- func WithBrokers(brokers ...string) ClientOption
- func WithKafkaOptions(opts ...kgo.Opt) ClientOption
- func WithLogger(logger *slog.Logger) ClientOption
- func WithOnDeleteHook(fn func(key []byte)) ClientOption
- func WithOnSetHook(fn func(key, value []byte)) ClientOption
- func WithReplicationFactor(rf int16) ClientOption
- type JSONSerde
- type KeyConstraint
- type KeyValue
- type ProtoOption
- type ProtoSerde
- type QueryOptions
- type ResourceClient
- func (r *ResourceClient[T]) Batch() *TypedBatch[T]
- func (r *ResourceClient[T]) Delete(ctx context.Context, key []byte) error
- func (r *ResourceClient[T]) Get(ctx context.Context, key []byte) (T, error)
- func (r *ResourceClient[T]) Put(ctx context.Context, key []byte, value T) error
- func (r *ResourceClient[T]) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[TypedKV[T], error]
- func (r *ResourceClient[T]) Raw() *Client
- type SchemaRegistryOption
- type Serde
- type Storage
- type TypedBatch
- type TypedKV
Constants ¶
This section is empty.
Variables ¶
var ErrClientClosed = errors.New("client is closed")
ErrClientClosed is returned when operations are performed on a closed client.
var ErrNotFound = errors.New("key not found")
ErrNotFound is returned when a key does not exist.
Functions ¶
func WrapOnDelete ¶
WrapOnDelete creates a typed OnDelete callback. This is a convenience wrapper for symmetry with WrapOnSet.
func WrapOnSet ¶
WrapOnSet creates a raw OnSet callback that deserializes values before calling the typed callback. Deserialization errors are silently ignored since hooks fire after successful storage operations, and the same serde is used for storage - a deserialization failure here indicates a bug.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch accumulates Put and Delete operations for batch execution.
type BatchStorage ¶
type BatchStorage interface {
Storage
// ApplyBatch applies a batch of set and delete operations atomically.
// For each item: nil Value means delete, non-nil means set.
ApplyBatch(ctx context.Context, items []KeyValue) error
}
BatchStorage is an optional interface for storage backends that support batch operations. Implementations can provide this for better performance during bulk inserts (e.g., bootstrap/restore).
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a KV store backed by Kafka. Writes go through Kafka and block until visible in reads. Each topic should contain a single resource type.
func NewClient ¶
func NewClient(ctx context.Context, topic string, storage Storage, opts ...ClientOption) (*Client, error)
NewClient creates a new KV store client. Each topic should contain a single resource type with its own indexes.
The provided context is used only for initialization (topic creation, metadata fetches, and bootstrap sync). It is not retained after NewClient returns - the client manages its own internal context for the consumer goroutine, which is cancelled when Close is called.
The caller MUST call Close() to release resources when done with the client. Cancelling the provided context does not shut down the client.
NewClient blocks until the consumer has caught up to the current high watermark, ensuring reads are consistent immediately after the client is created. For topics with large amounts of data, this may take some time.
func (*Client) Batch ¶
Batch creates a new batch for executing multiple operations atomically. The batch syncs only once after all operations complete, waiting for the highest offset.
func (*Client) Delete ¶
Delete removes a key. Blocks until the delete is visible in this client's reads.
func (*Client) Get ¶
Get retrieves a value by key from local storage. Returns ErrNotFound if the key does not exist. May return stale data if this client's consumer is lagging or if other clients wrote recently.
func (*Client) Put ¶
Put stores a key-value pair. Blocks until the write is visible in this client's reads.
func (*Client) Range ¶
func (c *Client) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[KeyValue, error]
Range iterates over keys in the given range from local storage. Use Prefix() to create a KeyConstraint for prefix queries. May return stale data if this client's consumer is lagging or if other clients wrote recently.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption configures a Client.
func WithBrokers ¶
func WithBrokers(brokers ...string) ClientOption
WithBrokers sets the Kafka broker addresses.
func WithKafkaOptions ¶
func WithKafkaOptions(opts ...kgo.Opt) ClientOption
WithKafkaOptions passes additional options to the franz-go client.
func WithLogger ¶
func WithLogger(logger *slog.Logger) ClientOption
WithLogger sets the logger for client operations. If not provided, defaults to slog.Default().
func WithOnDeleteHook ¶
func WithOnDeleteHook(fn func(key []byte)) ClientOption
WithOnDeleteHook registers a callback that fires after each successful Delete operation in the consumer. This includes both bootstrap replay and live updates. The callback receives the key bytes.
func WithOnSetHook ¶
func WithOnSetHook(fn func(key, value []byte)) ClientOption
WithOnSetHook registers a callback that fires after each successful Set operation in the consumer. This includes both bootstrap replay and live updates. The callback receives the key and value bytes.
func WithReplicationFactor ¶
func WithReplicationFactor(rf int16) ClientOption
WithReplicationFactor sets the topic replication factor.
type JSONSerde ¶
type JSONSerde[T any] struct{}
JSONSerde serializes values as JSON.
func (*JSONSerde[T]) Deserialize ¶
Deserialize unmarshals JSON bytes to the value.
type KeyConstraint ¶
type KeyConstraint struct {
Start []byte // inclusive, nil = beginning
End []byte // exclusive, nil = end
}
KeyConstraint defines bounds for a key range scan.
func Prefix ¶
func Prefix(p string) KeyConstraint
Prefix creates a KeyConstraint that matches all keys with the given prefix.
type ProtoOption ¶
type ProtoOption func(*protoConfig)
ProtoOption configures a ProtoSerde.
func WithSchemaRegistry ¶
func WithSchemaRegistry( srClient *sr.Client, subject string, schemaContent string, opts ...SchemaRegistryOption, ) ProtoOption
WithSchemaRegistry configures Schema Registry support for ProtoSerde. This wraps protobuf payloads with Confluent wire format for schema governance. Schema is registered immediately at initialization time.
Parameters:
- srClient: Schema Registry client
- subject: Subject name (typically "{topic}-value")
- schemaContent: Proto file content as string
- opts: Optional references for proto imports
Example:
serde, err := kvstore.Proto(
func() *pb.MyMessage { return &pb.MyMessage{} },
kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema),
)
type ProtoSerde ¶
ProtoSerde serializes values as protobuf, optionally with Schema Registry wire format.
func (*ProtoSerde[T]) Deserialize ¶
func (s *ProtoSerde[T]) Deserialize(b []byte) (T, error)
Deserialize unmarshals protobuf bytes to the message. If Schema Registry is configured, decodes Confluent wire format first.
On an unknown schema ID (e.g. after a schema evolution), the ID is validated against Schema Registry via SchemaByID, then registered locally for subsequent decodes. The actual deserialization always uses proto.Unmarshal with the compiled Go type -- unlike Avro, protobuf's wire format is self-describing (field numbers + wire types), so the schema from SR is not needed at decode time. SR's role for protobuf is governance (compatibility checks at registration) and validation (confirming the ID is legitimate), not driving deserialization.
func (*ProtoSerde[T]) Serialize ¶
func (s *ProtoSerde[T]) Serialize(v T) ([]byte, error)
Serialize marshals the protobuf message. If Schema Registry is configured, wraps with Confluent wire format.
type ResourceClient ¶
type ResourceClient[T any] struct { // contains filtered or unexported fields }
ResourceClient wraps Client with typed operations using a Serde.
func NewResourceClient ¶
func NewResourceClient[T any](client *Client, serde Serde[T]) *ResourceClient[T]
NewResourceClient creates a typed client wrapping the given bytes client.
func (*ResourceClient[T]) Batch ¶
func (r *ResourceClient[T]) Batch() *TypedBatch[T]
Batch creates a new typed batch for bulk operations.
func (*ResourceClient[T]) Delete ¶
func (r *ResourceClient[T]) Delete(ctx context.Context, key []byte) error
Delete removes a key. Blocks until the delete is visible in this client's reads.
func (*ResourceClient[T]) Get ¶
func (r *ResourceClient[T]) Get(ctx context.Context, key []byte) (T, error)
Get retrieves a value by key from local storage. Returns ErrNotFound if the key does not exist. May return stale data if this client's consumer is lagging or if other clients wrote recently.
func (*ResourceClient[T]) Put ¶
func (r *ResourceClient[T]) Put(ctx context.Context, key []byte, value T) error
Put stores a value. Blocks until the write is visible in this client's reads.
func (*ResourceClient[T]) Range ¶
func (r *ResourceClient[T]) Range(ctx context.Context, constraint KeyConstraint, opts QueryOptions) iter.Seq2[TypedKV[T], error]
Range iterates over keys in the given range from local storage. Use Prefix() for prefix queries. May return stale data if this client's consumer is lagging or if other clients wrote recently. Stops iteration on first storage or deserialization error. Corrupt data indicates a real problem that must be fixed, not worked around.
func (*ResourceClient[T]) Raw ¶
func (r *ResourceClient[T]) Raw() *Client
Raw returns the underlying untyped Client.
type SchemaRegistryOption ¶
type SchemaRegistryOption func(*srConfig)
SchemaRegistryOption configures Schema Registry behavior.
func WithMessageName ¶
func WithMessageName(name string) SchemaRegistryOption
WithMessageName specifies which message in the proto file is the serialized payload. The message index is resolved by scanning the proto content for top-level message declarations. This avoids hardcoding indices that break when messages are reordered.
If not specified, the first message (index 0) is used.
Example:
kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema,
kvstore.WithMessageName("LLMProvider"),
)
func WithSchemaReferences ¶
func WithSchemaReferences(refs []sr.SchemaReference) SchemaRegistryOption
WithSchemaReferences specifies proto import dependencies.
Example:
WithSchemaReferences([]sr.SchemaReference{
{Name: "common.proto", Subject: "topic-common", Version: 1},
})
type Serde ¶
Serde handles serialization and deserialization of values.
func NewSchemaRegistrySerde
deprecated
func NewSchemaRegistrySerde[T proto.Message]( srClient *sr.Client, subject string, schemaContent string, factory func() T, opts ...SchemaRegistryOption, ) (Serde[T], error)
NewSchemaRegistrySerde creates a protobuf serde with Schema Registry support.
Deprecated: Use Proto with WithSchemaRegistry instead.
func Proto ¶
func Proto[T proto.Message](factory func() T, opts ...ProtoOption) (Serde[T], error)
Proto returns a protobuf serde for type T. The factory function must return a new instance of the proto message.
Example basic usage:
serde, err := kvstore.Proto(func() *pb.MyMessage { return &pb.MyMessage{} })
Example with Schema Registry:
serde, err := kvstore.Proto(
func() *pb.MyMessage { return &pb.MyMessage{} },
kvstore.WithSchemaRegistry(srClient, "topic-value", protoSchema),
)
type Storage ¶
type Storage interface {
Get(ctx context.Context, key []byte) ([]byte, error)
Set(ctx context.Context, key []byte, value []byte) error
Delete(ctx context.Context, key []byte) error
Range(ctx context.Context, c KeyConstraint, opts QueryOptions) iter.Seq2[KeyValue, error]
}
Storage is the interface for key-value storage backends. All methods accept a context to support cancellation and timeouts, particularly important for remote storage implementations.
type TypedBatch ¶
type TypedBatch[T any] struct { // contains filtered or unexported fields }
TypedBatch accumulates typed Put and Delete operations.
func (*TypedBatch[T]) Delete ¶
func (b *TypedBatch[T]) Delete(key []byte) *TypedBatch[T]
Delete adds a delete operation to the batch.
func (*TypedBatch[T]) Execute ¶
func (b *TypedBatch[T]) Execute(ctx context.Context) error
Execute produces all operations and waits for visibility in this client's reads.
func (*TypedBatch[T]) Put ¶
func (b *TypedBatch[T]) Put(key []byte, value T) *TypedBatch[T]
Put adds a put operation to the batch.