Documentation
¶
Index ¶
- Variables
- func GetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (S, error)
- func InitBatchKeyValueStoreRef[K comparable, V any, Kout, Vout any](r *BatchKeyValueStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], ...) error
- func InitKeyValueStoreRef[K comparable, V any, Kout, Vout any](r *KeyValueStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], ...) error
- func InitWindowedStoreRef[K comparable, V any, Kout, Vout any](r *WindowedStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], ...) error
- func MapIter[K, V any](seq iter.Seq2[[]byte, []byte], keyDeserializer kserde.Deserializer[K], ...) iter.Seq2[K, V]
- func MustGetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) S
- type BatchKeyValueStore
- func GetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (BatchKeyValueStore[K, V], error)
- func MustGetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) BatchKeyValueStore[K, V]
- type BatchKeyValueStoreRef
- func (r *BatchKeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error
- func (r *BatchKeyValueStoreRef[K, V]) DeleteBatch(ctx context.Context, keys []K) error
- func (r *BatchKeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)
- func (r *BatchKeyValueStoreRef[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)
- func (r *BatchKeyValueStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error
- func (r *BatchKeyValueStoreRef[K, V]) Name() string
- func (r *BatchKeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error
- func (r *BatchKeyValueStoreRef[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error
- func (r *BatchKeyValueStoreRef[K, V]) Store() BatchKeyValueStore[K, V]
- type BatchingKeyValueStoreRestoreCallback
- type ChangeloggingBatchKeyValueStore
- func (c *ChangeloggingBatchKeyValueStore[K, V]) DeleteBatch(ctx context.Context, keys []K) error
- func (c *ChangeloggingBatchKeyValueStore[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)
- func (c *ChangeloggingBatchKeyValueStore[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error
- type ChangeloggingKeyValueStore
- func (c *ChangeloggingKeyValueStore[K, V]) All(ctx context.Context) iter.Seq2[K, V]
- func (c *ChangeloggingKeyValueStore[K, V]) Close() error
- func (c *ChangeloggingKeyValueStore[K, V]) Delete(ctx context.Context, key K) error
- func (c *ChangeloggingKeyValueStore[K, V]) Flush(ctx context.Context) error
- func (c *ChangeloggingKeyValueStore[K, V]) Get(ctx context.Context, key K) (V, bool, error)
- func (c *ChangeloggingKeyValueStore[K, V]) Init(ctx kprocessor.ProcessorContextInternal) error
- func (c *ChangeloggingKeyValueStore[K, V]) Name() string
- func (c *ChangeloggingKeyValueStore[K, V]) Persistent() bool
- func (c *ChangeloggingKeyValueStore[K, V]) Range(ctx context.Context, from, to K) iter.Seq2[K, V]
- func (c *ChangeloggingKeyValueStore[K, V]) Set(ctx context.Context, key K, value V) error
- type KeyValueStore
- type KeyValueStoreRef
- func (r *KeyValueStoreRef[K, V]) All(ctx context.Context) iter.Seq2[K, V]
- func (r *KeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error
- func (r *KeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)
- func (r *KeyValueStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error
- func (r *KeyValueStoreRef[K, V]) Name() string
- func (r *KeyValueStoreRef[K, V]) Range(ctx context.Context, from, to K) iter.Seq2[K, V]
- func (r *KeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error
- func (r *KeyValueStoreRef[K, V]) Store() KeyValueStore[K, V]
- type KeyValueStoreRestoreCallback
- type NoOpRestoreListener
- func (n *NoOpRestoreListener) OnBatchRestored(topicPartition checkpoint.TopicPartition, storeName string, ...)
- func (n *NoOpRestoreListener) OnRestoreEnd(topicPartition checkpoint.TopicPartition, storeName string, ...)
- func (n *NoOpRestoreListener) OnRestoreStart(topicPartition checkpoint.TopicPartition, storeName string, ...)
- type StateRestoreCallback
- type StateRestoreListener
- type StateStore
- type StateStoreGetter
- type StoreBackend
- type StoreBuilder
- type TypeErasedStoreBuilder
- type WindowedStore
- type WindowedStoreRef
Constants ¶
This section is empty.
Variables ¶
var (
ErrKeyNotFound = errors.New("store: key not found")
)
Functions ¶
func GetStore ¶
func GetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (S, error)
GetStore retrieves any store from the processor context and type-asserts it. Use this for custom store types. Returns error if store not found or wrong type.
Example:
store, err := kstate.GetStore[*MyCustomStore](ctx, "custom-store")
func InitBatchKeyValueStoreRef ¶
func InitBatchKeyValueStoreRef[K comparable, V any, Kout, Vout any]( r *BatchKeyValueStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], name string, ) error
InitBatchKeyValueStoreRef initializes using any ProcessorContext type.
func InitKeyValueStoreRef ¶
func InitKeyValueStoreRef[K comparable, V any, Kout, Vout any]( r *KeyValueStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], name string, ) error
InitFromAny initializes using any ProcessorContext type (for generic processors).
func InitWindowedStoreRef ¶
func InitWindowedStoreRef[K comparable, V any, Kout, Vout any]( r *WindowedStoreRef[K, V], ctx kprocessor.ProcessorContext[Kout, Vout], name string, ) error
InitWindowedStoreRef initializes using any ProcessorContext type.
func MapIter ¶
func MapIter[K, V any]( seq iter.Seq2[[]byte, []byte], keyDeserializer kserde.Deserializer[K], valueDeserializer kserde.Deserializer[V], ) iter.Seq2[K, V]
MapIter transforms a byte iterator into a typed iterator. Uses provided deserializers to convert keys and values.
func MustGetStore ¶
func MustGetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) S
MustGetStore retrieves any store and type-asserts it, panicking on failure.
Types ¶
type BatchKeyValueStore ¶
type BatchKeyValueStore[K comparable, V any] interface { KeyValueStore[K, V] // SetBatch stores multiple key-value pairs atomically SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error // GetBatch retrieves multiple values by keys GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error) // DeleteBatch removes multiple keys atomically DeleteBatch(ctx context.Context, keys []K) error }
BatchKeyValueStore extends KeyValueStore with batch operations kstreams-specific (not in Kafka Streams)
func GetBatchKeyValueStore ¶
func GetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (BatchKeyValueStore[K, V], error)
GetBatchKeyValueStore retrieves a typed BatchKeyValueStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters.
func MustGetBatchKeyValueStore ¶
func MustGetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) BatchKeyValueStore[K, V]
MustGetBatchKeyValueStore retrieves a typed BatchKeyValueStore, panicking on failure.
type BatchKeyValueStoreRef ¶
type BatchKeyValueStoreRef[K comparable, V any] struct { // contains filtered or unexported fields }
BatchKeyValueStoreRef provides type-safe access to a BatchKeyValueStore. Use this for high-throughput scenarios with batch operations.
func (*BatchKeyValueStoreRef[K, V]) Delete ¶
func (r *BatchKeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error
Delete removes a key.
func (*BatchKeyValueStoreRef[K, V]) DeleteBatch ¶
func (r *BatchKeyValueStoreRef[K, V]) DeleteBatch(ctx context.Context, keys []K) error
DeleteBatch removes multiple keys.
func (*BatchKeyValueStoreRef[K, V]) Get ¶
func (r *BatchKeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)
Get retrieves a value by key.
func (*BatchKeyValueStoreRef[K, V]) GetBatch ¶
func (r *BatchKeyValueStoreRef[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)
GetBatch retrieves multiple values by keys.
func (*BatchKeyValueStoreRef[K, V]) Init ¶
func (r *BatchKeyValueStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error
Init initializes the batch store reference from the processor context.
func (*BatchKeyValueStoreRef[K, V]) Name ¶
func (r *BatchKeyValueStoreRef[K, V]) Name() string
Name returns the store name.
func (*BatchKeyValueStoreRef[K, V]) Set ¶
func (r *BatchKeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error
Set stores a key-value pair.
func (*BatchKeyValueStoreRef[K, V]) SetBatch ¶
func (r *BatchKeyValueStoreRef[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error
SetBatch stores multiple key-value pairs efficiently.
func (*BatchKeyValueStoreRef[K, V]) Store ¶
func (r *BatchKeyValueStoreRef[K, V]) Store() BatchKeyValueStore[K, V]
Store returns the underlying store.
type BatchingKeyValueStoreRestoreCallback ¶
type BatchingKeyValueStoreRestoreCallback[K comparable, V any] struct { // contains filtered or unexported fields }
BatchingKeyValueStoreRestoreCallback uses batch operations if available Falls back to individual operations if store doesn't support batching
func NewBatchingKeyValueStoreRestoreCallback ¶
func NewBatchingKeyValueStoreRestoreCallback[K comparable, V any]( store KeyValueStore[K, V], keyDecoder func([]byte) (K, error), valueDecoder func([]byte) (V, error), ) *BatchingKeyValueStoreRestoreCallback[K, V]
NewBatchingKeyValueStoreRestoreCallback creates a batching restore callback
func (*BatchingKeyValueStoreRestoreCallback[K, V]) Restore ¶
func (r *BatchingKeyValueStoreRestoreCallback[K, V]) Restore(key, value []byte) error
Restore restores a single record
func (*BatchingKeyValueStoreRestoreCallback[K, V]) RestoreBatch ¶
func (r *BatchingKeyValueStoreRestoreCallback[K, V]) RestoreBatch(records []*kgo.Record) error
RestoreBatch restores using batch operations if the store supports BatchKeyValueStore interface
type ChangeloggingBatchKeyValueStore ¶
type ChangeloggingBatchKeyValueStore[K comparable, V any] struct { *ChangeloggingKeyValueStore[K, V] // contains filtered or unexported fields }
ChangeloggingBatchKeyValueStore adds batch operation logging support Only used if inner store implements BatchKeyValueStore
func NewChangeloggingBatchKeyValueStore ¶
func NewChangeloggingBatchKeyValueStore[K comparable, V any]( inner BatchKeyValueStore[K, V], keyEncoder func(K) ([]byte, error), valueEncoder func(V) ([]byte, error), ) *ChangeloggingBatchKeyValueStore[K, V]
NewChangeloggingBatchKeyValueStore creates a changelog-enabled batch store
func (*ChangeloggingBatchKeyValueStore[K, V]) DeleteBatch ¶
func (c *ChangeloggingBatchKeyValueStore[K, V]) DeleteBatch(ctx context.Context, keys []K) error
DeleteBatch deletes batch from store, then logs tombstones to changelog
func (*ChangeloggingBatchKeyValueStore[K, V]) GetBatch ¶
func (c *ChangeloggingBatchKeyValueStore[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)
GetBatch reads from inner store (no logging)
func (*ChangeloggingBatchKeyValueStore[K, V]) SetBatch ¶
func (c *ChangeloggingBatchKeyValueStore[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error
SetBatch writes batch to store, then logs each entry to changelog Note: Logs each entry individually (not as a single batch to Kafka) This matches Kafka Streams behavior (ChangeLoggingKeyValueBytesStore.putAll)
type ChangeloggingKeyValueStore ¶
type ChangeloggingKeyValueStore[K comparable, V any] struct { // contains filtered or unexported fields }
ChangeloggingKeyValueStore wraps a KeyValueStore to log all changes to a changelog topic Matches Kafka Streams' ChangeLoggingKeyValueBytesStore
Pattern: Decorator
- Delegates reads to inner store (no logging)
- Intercepts writes to log changes (dual-write)
Dual-write order (critical):
- Write to inner store FIRST
- Log to changelog topic SECOND
This ensures that if changelog write fails, we can retry without duplicating store writes.
Tombstone handling:
- Delete(key) logs with nil value (tombstone in Kafka)
- Tombstones trigger log compaction to remove deleted keys
func NewChangeloggingKeyValueStore ¶
func NewChangeloggingKeyValueStore[K comparable, V any]( inner KeyValueStore[K, V], keyEncoder func(K) ([]byte, error), valueEncoder func(V) ([]byte, error), ) *ChangeloggingKeyValueStore[K, V]
NewChangeloggingKeyValueStore creates a changelog-enabled wrapper around a store
Parameters:
- inner: The underlying store (e.g., PebbleStore)
- keyEncoder: Serializer for keys (e.g., StringSerializer)
- valueEncoder: Serializer for values (e.g., JSONSerializer)
The encoders must match the decoders used in the RestoreCallback!
func (*ChangeloggingKeyValueStore[K, V]) All ¶
func (c *ChangeloggingKeyValueStore[K, V]) All(ctx context.Context) iter.Seq2[K, V]
All reads from inner store (no logging)
func (*ChangeloggingKeyValueStore[K, V]) Close ¶
func (c *ChangeloggingKeyValueStore[K, V]) Close() error
Close closes inner store
func (*ChangeloggingKeyValueStore[K, V]) Delete ¶
func (c *ChangeloggingKeyValueStore[K, V]) Delete(ctx context.Context, key K) error
Delete removes from store, then logs tombstone to changelog Matches: ChangeLoggingKeyValueBytesStore.delete()
func (*ChangeloggingKeyValueStore[K, V]) Flush ¶
func (c *ChangeloggingKeyValueStore[K, V]) Flush(ctx context.Context) error
Flush flushes inner store (changelog is flushed separately by task)
func (*ChangeloggingKeyValueStore[K, V]) Get ¶
func (c *ChangeloggingKeyValueStore[K, V]) Get(ctx context.Context, key K) (V, bool, error)
Get reads from inner store (no changelog logging) Read operations don't modify state, so no logging needed
func (*ChangeloggingKeyValueStore[K, V]) Init ¶
func (c *ChangeloggingKeyValueStore[K, V]) Init(ctx kprocessor.ProcessorContextInternal) error
Init initializes the store and captures the processor context Context is needed for logging changes via context.LogChange()
func (*ChangeloggingKeyValueStore[K, V]) Name ¶
func (c *ChangeloggingKeyValueStore[K, V]) Name() string
Name returns the store name (delegates to inner)
func (*ChangeloggingKeyValueStore[K, V]) Persistent ¶
func (c *ChangeloggingKeyValueStore[K, V]) Persistent() bool
Persistent delegates to inner store Matches Kafka Streams' ChangeLoggingKeyValueBytesStore.persistent()
type KeyValueStore ¶
type KeyValueStore[K comparable, V any] interface { StateStore // Get retrieves a value by key // Returns (value, true, nil) if found // Returns (zero, false, nil) if not found // Returns (zero, false, err) on error Get(ctx context.Context, key K) (V, bool, error) // Set stores a key-value pair Set(ctx context.Context, key K, value V) error // Delete removes a key Delete(ctx context.Context, key K) error // Range returns an iterator over a range of keys [from, to) Range(ctx context.Context, from, to K) iter.Seq2[K, V] // All returns an iterator over all keys All(ctx context.Context) iter.Seq2[K, V] }
KeyValueStore is a key-value state store interface Matches Kafka Streams' org.apache.kafka.streams.state.KeyValueStore
func GetKeyValueStore ¶
func GetKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (KeyValueStore[K, V], error)
GetKeyValueStore retrieves a typed KeyValueStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters (common in kstreams architecture).
Example:
func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
store, err := kstate.GetKeyValueStore[string, int64](ctx, "my-store")
if err != nil {
return fmt.Errorf("get store: %w", err)
}
p.store = store
return nil
}
func MustGetKeyValueStore ¶
func MustGetKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) KeyValueStore[K, V]
MustGetKeyValueStore retrieves a typed KeyValueStore, panicking on failure. Use this in Init() where failure should be fatal.
Example:
func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
p.store = kstate.MustGetKeyValueStore[string, int64](ctx, "my-store")
return nil
}
type KeyValueStoreRef ¶
type KeyValueStoreRef[K comparable, V any] struct { // contains filtered or unexported fields }
KeyValueStoreRef provides type-safe access to a KeyValueStore. Use this as a processor field to declare store dependencies at compile time.
Example:
type MyProcessor struct {
counter kstate.KeyValueStoreRef[string, int64]
}
func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
return p.counter.Init(ctx, "counter-store")
}
func (p *MyProcessor) Process(ctx context.Context, key string, value int) error {
count, _, _ := p.counter.Get(ctx, key)
return p.counter.Set(ctx, key, count+1)
}
func (*KeyValueStoreRef[K, V]) All ¶
func (r *KeyValueStoreRef[K, V]) All(ctx context.Context) iter.Seq2[K, V]
All iterates over all entries. Panics if not initialized.
func (*KeyValueStoreRef[K, V]) Delete ¶
func (r *KeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error
Delete removes a key. Panics if not initialized.
func (*KeyValueStoreRef[K, V]) Get ¶
func (r *KeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)
Get retrieves a value by key. Panics if not initialized.
func (*KeyValueStoreRef[K, V]) Init ¶
func (r *KeyValueStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error
Init initializes the store reference from the processor context. Must be called in the processor's Init method before using the store.
func (*KeyValueStoreRef[K, V]) Name ¶
func (r *KeyValueStoreRef[K, V]) Name() string
Name returns the store name.
func (*KeyValueStoreRef[K, V]) Range ¶
func (r *KeyValueStoreRef[K, V]) Range(ctx context.Context, from, to K) iter.Seq2[K, V]
Range iterates over a key range. Panics if not initialized.
func (*KeyValueStoreRef[K, V]) Set ¶
func (r *KeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error
Set stores a key-value pair. Panics if not initialized.
func (*KeyValueStoreRef[K, V]) Store ¶
func (r *KeyValueStoreRef[K, V]) Store() KeyValueStore[K, V]
Store returns the underlying store (for advanced use cases).
type KeyValueStoreRestoreCallback ¶
type KeyValueStoreRestoreCallback[K comparable, V any] struct { // contains filtered or unexported fields }
KeyValueStoreRestoreCallback is the default restore callback for KeyValueStore Handles deserialization and applies records to the store
Tombstone handling:
- record.Value == nil → store.Delete(key)
- record.Value != nil → store.Set(key, value)
func NewKeyValueStoreRestoreCallback ¶
func NewKeyValueStoreRestoreCallback[K comparable, V any]( store KeyValueStore[K, V], keyDecoder func([]byte) (K, error), valueDecoder func([]byte) (V, error), ) *KeyValueStoreRestoreCallback[K, V]
NewKeyValueStoreRestoreCallback creates a restore callback for a KeyValueStore
func (*KeyValueStoreRestoreCallback[K, V]) Restore ¶
func (r *KeyValueStoreRestoreCallback[K, V]) Restore(key, value []byte) error
Restore restores a single record
func (*KeyValueStoreRestoreCallback[K, V]) RestoreBatch ¶
func (r *KeyValueStoreRestoreCallback[K, V]) RestoreBatch(records []*kgo.Record) error
RestoreBatch restores a batch of records More efficient than calling Restore() for each record individually
type NoOpRestoreListener ¶
type NoOpRestoreListener struct{}
NoOpRestoreListener is a no-op implementation of StateRestoreListener Used when no monitoring is needed
func (*NoOpRestoreListener) OnBatchRestored ¶
func (n *NoOpRestoreListener) OnBatchRestored(topicPartition checkpoint.TopicPartition, storeName string, batchEndOffset int64, numRestored int64)
func (*NoOpRestoreListener) OnRestoreEnd ¶
func (n *NoOpRestoreListener) OnRestoreEnd(topicPartition checkpoint.TopicPartition, storeName string, totalRestored int64)
func (*NoOpRestoreListener) OnRestoreStart ¶
func (n *NoOpRestoreListener) OnRestoreStart(topicPartition checkpoint.TopicPartition, storeName string, startingOffset, endingOffset int64)
type StateRestoreCallback ¶
type StateRestoreCallback interface {
// Restore a single record (called if RestoreBatch not implemented)
// value = nil indicates tombstone (deletion)
Restore(key, value []byte) error
// RestoreBatch restores a batch of records for efficiency
// Implementations should check for tombstones (record.Value == nil)
RestoreBatch(records []*kgo.Record) error
}
StateRestoreCallback defines how to restore records from a changelog topic into a state store Matches Kafka Streams' org.apache.kafka.streams.processor.StateRestoreCallback
Implementations should handle:
- Normal records: Apply put(key, value) to store
- Tombstone records (nil value): Apply delete(key) to store
Two restoration modes:
- Per-record: Restore() called for each record individually (legacy, slower)
- Batched: RestoreBatch() called with batches of records (modern, faster)
type StateRestoreListener ¶
type StateRestoreListener interface {
// OnRestoreStart is called when restoration begins
// startingOffset: First offset to restore (usually checkpoint offset + 1)
// endingOffset: EXCLUSIVE ending offset (high water mark, one past the last offset to restore)
// Matches Kafka Streams semantics: restoration completes when reaching endingOffset - 1
OnRestoreStart(topicPartition checkpoint.TopicPartition, storeName string, startingOffset, endingOffset int64)
// OnBatchRestored is called after each batch is applied to the store
// batchEndOffset: Last offset in the batch
// numRestored: Number of records in this batch
OnBatchRestored(topicPartition checkpoint.TopicPartition, storeName string, batchEndOffset int64, numRestored int64)
// OnRestoreEnd is called when restoration completes
// totalRestored: Total number of records restored
OnRestoreEnd(topicPartition checkpoint.TopicPartition, storeName string, totalRestored int64)
}
StateRestoreListener provides callbacks for monitoring restoration progress Matches Kafka Streams' org.apache.kafka.streams.processor.StateRestoreListener
Used for:
- Progress monitoring (e.g., "Restored 1000/5000 records")
- Metrics collection
- Timeout detection (fail if restoration takes too long)
Note: Must be stateless or provide internal synchronization (called from multiple goroutines)
type StateStore ¶
type StateStore interface {
// Name returns the store name
Name() string
// Init initializes the store with processor context
Init(ctx kprocessor.ProcessorContextInternal) error
// Flush persists any cached data
Flush(ctx context.Context) error
// Close closes the store
Close() error
// Persistent returns true if the store persists data to disk
// Returns false for in-memory stores
// CRITICAL: Only persistent stores should be checkpointed
// Matches Kafka Streams' StateStore.persistent()
Persistent() bool
}
StateStore is the base interface for all state stores Matches Kafka Streams' org.apache.kafka.streams.processor.StateStore
type StateStoreGetter ¶
type StateStoreGetter interface {
GetStateStore() StateStore
}
StateStoreGetter is implemented by adapters that wrap a StateStore This allows type-safe extraction of the underlying store without conflicting interface methods (kprocessor.Store vs kstate.StateStore)
type StoreBackend ¶
type StoreBackend interface {
StateStore
Set(k, v []byte) error
Get(k []byte) ([]byte, error)
Delete(k []byte) error
Range(lower, upper []byte) iter.Seq2[[]byte, []byte]
All() iter.Seq2[[]byte, []byte]
}
StoreBackend is the low-level byte-oriented store interface. Implemented by pebble and s3 stores.
type StoreBuilder ¶
type StoreBuilder[T StateStore] interface { // WithChangelogEnabled enables changelog topic for this store // config contains Kafka topic-level configuration (optional, can be nil) // // Common config keys: // - "retention.ms": Changelog retention time in milliseconds // - "cleanup.policy": "compact" or "delete" // - "segment.ms": Segment size for time-based retention // // Default (if config is nil): Uses application-level defaults WithChangelogEnabled(config map[string]string) StoreBuilder[T] // WithChangelogDisabled disables changelog topic for this store // Use for stores that don't need durability (e.g., remote stores like S3, BigTable) WithChangelogDisabled() StoreBuilder[T] // Build constructs the state store with configured options // If changelog is enabled, wraps store in ChangeloggingKeyValueStore // Returns error if configuration is invalid or store initialization fails Build() (T, error) // Name returns the store name Name() string // LogConfig returns the changelog topic configuration // Returns nil if changelog is disabled LogConfig() map[string]string // ChangelogEnabled returns whether changelog is enabled for this store ChangelogEnabled() bool // RestoreCallback returns the callback for restoring this store from changelog // Returns nil if changelog is disabled RestoreCallback() StateRestoreCallback }
StoreBuilder provides a fluent API for configuring and building state stores Matches Kafka Streams' org.apache.kafka.streams.state.StoreBuilder interface
Key configuration options:
- Changelog logging (enabled/disabled per store)
- Changelog topic configuration (retention, cleanup policy, etc.)
Example usage:
builder := pebble.NewKeyValueStoreBuilder[string, User]("user-store", "/tmp/state")
builder.WithChangelogEnabled(map[string]string{
"retention.ms": "86400000", // 1 day
"cleanup.policy": "compact",
})
store := builder.Build()
type TypeErasedStoreBuilder ¶
type TypeErasedStoreBuilder interface {
// BuildStateStore constructs the state store (type-erased version)
BuildStateStore() (StateStore, error)
// Name returns the store name
Name() string
// LogConfig returns the changelog topic configuration
LogConfig() map[string]string
// ChangelogEnabled returns whether changelog is enabled
ChangelogEnabled() bool
// RestoreCallback returns the restore callback
RestoreCallback() StateRestoreCallback
}
TypeErasedStoreBuilder is a non-generic interface for store builders. This provides type safety when storing builders in maps (avoids using 'any'). All StoreBuilder[T] implementations must also implement this interface.
type WindowedStore ¶
type WindowedStore[K comparable, V any] interface { StateStore // Set stores a value for a key at a specific timestamp Set(ctx context.Context, key K, value V, timestamp int64) error // Get retrieves a value by key at a specific timestamp Get(ctx context.Context, key K, timestamp int64) (V, bool, error) // Fetch retrieves values for a key within a time range Fetch(ctx context.Context, key K, timeFrom, timeTo int64) iter.Seq2[K, V] // Delete removes a value for a key at a specific timestamp Delete(ctx context.Context, key K, timestamp int64) error }
WindowedStore is a windowed state store interface Matches Kafka Streams' org.apache.kafka.streams.state.WindowStore
func GetWindowedStore ¶
func GetWindowedStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (WindowedStore[K, V], error)
GetWindowedStore retrieves a typed WindowedStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters.
func MustGetWindowedStore ¶
func MustGetWindowedStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) WindowedStore[K, V]
MustGetWindowedStore retrieves a typed WindowedStore, panicking on failure.
type WindowedStoreRef ¶
type WindowedStoreRef[K comparable, V any] struct { // contains filtered or unexported fields }
WindowedStoreRef provides type-safe access to a WindowedStore.
func (*WindowedStoreRef[K, V]) Init ¶
func (r *WindowedStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error
Init initializes the windowed store reference from the processor context.
func (*WindowedStoreRef[K, V]) Name ¶
func (r *WindowedStoreRef[K, V]) Name() string
Name returns the store name.
func (*WindowedStoreRef[K, V]) Store ¶
func (r *WindowedStoreRef[K, V]) Store() WindowedStore[K, V]
Store returns the underlying store.