kstate

package
v0.0.0-...-8f0a81e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyNotFound = errors.New("store: key not found")
)

Functions

func GetStore

func GetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (S, error)

GetStore retrieves any store from the processor context and type-asserts it. Use this for custom store types. Returns error if store not found or wrong type.

Example:

store, err := kstate.GetStore[*MyCustomStore](ctx, "custom-store")

func InitBatchKeyValueStoreRef

func InitBatchKeyValueStoreRef[K comparable, V any, Kout, Vout any](
	r *BatchKeyValueStoreRef[K, V],
	ctx kprocessor.ProcessorContext[Kout, Vout],
	name string,
) error

InitBatchKeyValueStoreRef initializes using any ProcessorContext type.

func InitKeyValueStoreRef

func InitKeyValueStoreRef[K comparable, V any, Kout, Vout any](
	r *KeyValueStoreRef[K, V],
	ctx kprocessor.ProcessorContext[Kout, Vout],
	name string,
) error

InitFromAny initializes using any ProcessorContext type (for generic processors).

func InitWindowedStoreRef

func InitWindowedStoreRef[K comparable, V any, Kout, Vout any](
	r *WindowedStoreRef[K, V],
	ctx kprocessor.ProcessorContext[Kout, Vout],
	name string,
) error

InitWindowedStoreRef initializes using any ProcessorContext type.

func MapIter

func MapIter[K, V any](
	seq iter.Seq2[[]byte, []byte],
	keyDeserializer kserde.Deserializer[K],
	valueDeserializer kserde.Deserializer[V],
) iter.Seq2[K, V]

MapIter transforms a byte iterator into a typed iterator. Uses provided deserializers to convert keys and values.

func MustGetStore

func MustGetStore[S any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) S

MustGetStore retrieves any store and type-asserts it, panicking on failure.

Types

type BatchKeyValueStore

type BatchKeyValueStore[K comparable, V any] interface {
	KeyValueStore[K, V]

	// SetBatch stores multiple key-value pairs atomically
	SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error

	// GetBatch retrieves multiple values by keys
	GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)

	// DeleteBatch removes multiple keys atomically
	DeleteBatch(ctx context.Context, keys []K) error
}

BatchKeyValueStore extends KeyValueStore with batch operations kstreams-specific (not in Kafka Streams)

func GetBatchKeyValueStore

func GetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (BatchKeyValueStore[K, V], error)

GetBatchKeyValueStore retrieves a typed BatchKeyValueStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters.

func MustGetBatchKeyValueStore

func MustGetBatchKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) BatchKeyValueStore[K, V]

MustGetBatchKeyValueStore retrieves a typed BatchKeyValueStore, panicking on failure.

type BatchKeyValueStoreRef

type BatchKeyValueStoreRef[K comparable, V any] struct {
	// contains filtered or unexported fields
}

BatchKeyValueStoreRef provides type-safe access to a BatchKeyValueStore. Use this for high-throughput scenarios with batch operations.

func (*BatchKeyValueStoreRef[K, V]) Delete

func (r *BatchKeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error

Delete removes a key.

func (*BatchKeyValueStoreRef[K, V]) DeleteBatch

func (r *BatchKeyValueStoreRef[K, V]) DeleteBatch(ctx context.Context, keys []K) error

DeleteBatch removes multiple keys.

func (*BatchKeyValueStoreRef[K, V]) Get

func (r *BatchKeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)

Get retrieves a value by key.

func (*BatchKeyValueStoreRef[K, V]) GetBatch

func (r *BatchKeyValueStoreRef[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)

GetBatch retrieves multiple values by keys.

func (*BatchKeyValueStoreRef[K, V]) Init

Init initializes the batch store reference from the processor context.

func (*BatchKeyValueStoreRef[K, V]) Name

func (r *BatchKeyValueStoreRef[K, V]) Name() string

Name returns the store name.

func (*BatchKeyValueStoreRef[K, V]) Set

func (r *BatchKeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error

Set stores a key-value pair.

func (*BatchKeyValueStoreRef[K, V]) SetBatch

func (r *BatchKeyValueStoreRef[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error

SetBatch stores multiple key-value pairs efficiently.

func (*BatchKeyValueStoreRef[K, V]) Store

func (r *BatchKeyValueStoreRef[K, V]) Store() BatchKeyValueStore[K, V]

Store returns the underlying store.

type BatchingKeyValueStoreRestoreCallback

type BatchingKeyValueStoreRestoreCallback[K comparable, V any] struct {
	// contains filtered or unexported fields
}

BatchingKeyValueStoreRestoreCallback uses batch operations if available Falls back to individual operations if store doesn't support batching

func NewBatchingKeyValueStoreRestoreCallback

func NewBatchingKeyValueStoreRestoreCallback[K comparable, V any](
	store KeyValueStore[K, V],
	keyDecoder func([]byte) (K, error),
	valueDecoder func([]byte) (V, error),
) *BatchingKeyValueStoreRestoreCallback[K, V]

NewBatchingKeyValueStoreRestoreCallback creates a batching restore callback

func (*BatchingKeyValueStoreRestoreCallback[K, V]) Restore

func (r *BatchingKeyValueStoreRestoreCallback[K, V]) Restore(key, value []byte) error

Restore restores a single record

func (*BatchingKeyValueStoreRestoreCallback[K, V]) RestoreBatch

func (r *BatchingKeyValueStoreRestoreCallback[K, V]) RestoreBatch(records []*kgo.Record) error

RestoreBatch restores using batch operations if the store supports BatchKeyValueStore interface

type ChangeloggingBatchKeyValueStore

type ChangeloggingBatchKeyValueStore[K comparable, V any] struct {
	*ChangeloggingKeyValueStore[K, V]
	// contains filtered or unexported fields
}

ChangeloggingBatchKeyValueStore adds batch operation logging support Only used if inner store implements BatchKeyValueStore

func NewChangeloggingBatchKeyValueStore

func NewChangeloggingBatchKeyValueStore[K comparable, V any](
	inner BatchKeyValueStore[K, V],
	keyEncoder func(K) ([]byte, error),
	valueEncoder func(V) ([]byte, error),
) *ChangeloggingBatchKeyValueStore[K, V]

NewChangeloggingBatchKeyValueStore creates a changelog-enabled batch store

func (*ChangeloggingBatchKeyValueStore[K, V]) DeleteBatch

func (c *ChangeloggingBatchKeyValueStore[K, V]) DeleteBatch(ctx context.Context, keys []K) error

DeleteBatch deletes batch from store, then logs tombstones to changelog

func (*ChangeloggingBatchKeyValueStore[K, V]) GetBatch

func (c *ChangeloggingBatchKeyValueStore[K, V]) GetBatch(ctx context.Context, keys []K) ([]kprocessor.KV[K, V], error)

GetBatch reads from inner store (no logging)

func (*ChangeloggingBatchKeyValueStore[K, V]) SetBatch

func (c *ChangeloggingBatchKeyValueStore[K, V]) SetBatch(ctx context.Context, entries []kprocessor.KV[K, V]) error

SetBatch writes batch to store, then logs each entry to changelog Note: Logs each entry individually (not as a single batch to Kafka) This matches Kafka Streams behavior (ChangeLoggingKeyValueBytesStore.putAll)

type ChangeloggingKeyValueStore

type ChangeloggingKeyValueStore[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ChangeloggingKeyValueStore wraps a KeyValueStore to log all changes to a changelog topic Matches Kafka Streams' ChangeLoggingKeyValueBytesStore

Pattern: Decorator

  • Delegates reads to inner store (no logging)
  • Intercepts writes to log changes (dual-write)

Dual-write order (critical):

  1. Write to inner store FIRST
  2. Log to changelog topic SECOND

This ensures that if changelog write fails, we can retry without duplicating store writes.

Tombstone handling:

  • Delete(key) logs with nil value (tombstone in Kafka)
  • Tombstones trigger log compaction to remove deleted keys

func NewChangeloggingKeyValueStore

func NewChangeloggingKeyValueStore[K comparable, V any](
	inner KeyValueStore[K, V],
	keyEncoder func(K) ([]byte, error),
	valueEncoder func(V) ([]byte, error),
) *ChangeloggingKeyValueStore[K, V]

NewChangeloggingKeyValueStore creates a changelog-enabled wrapper around a store

Parameters:

  • inner: The underlying store (e.g., PebbleStore)
  • keyEncoder: Serializer for keys (e.g., StringSerializer)
  • valueEncoder: Serializer for values (e.g., JSONSerializer)

The encoders must match the decoders used in the RestoreCallback!

func (*ChangeloggingKeyValueStore[K, V]) All

func (c *ChangeloggingKeyValueStore[K, V]) All(ctx context.Context) iter.Seq2[K, V]

All reads from inner store (no logging)

func (*ChangeloggingKeyValueStore[K, V]) Close

func (c *ChangeloggingKeyValueStore[K, V]) Close() error

Close closes inner store

func (*ChangeloggingKeyValueStore[K, V]) Delete

func (c *ChangeloggingKeyValueStore[K, V]) Delete(ctx context.Context, key K) error

Delete removes from store, then logs tombstone to changelog Matches: ChangeLoggingKeyValueBytesStore.delete()

func (*ChangeloggingKeyValueStore[K, V]) Flush

func (c *ChangeloggingKeyValueStore[K, V]) Flush(ctx context.Context) error

Flush flushes inner store (changelog is flushed separately by task)

func (*ChangeloggingKeyValueStore[K, V]) Get

func (c *ChangeloggingKeyValueStore[K, V]) Get(ctx context.Context, key K) (V, bool, error)

Get reads from inner store (no changelog logging) Read operations don't modify state, so no logging needed

func (*ChangeloggingKeyValueStore[K, V]) Init

Init initializes the store and captures the processor context Context is needed for logging changes via context.LogChange()

func (*ChangeloggingKeyValueStore[K, V]) Name

func (c *ChangeloggingKeyValueStore[K, V]) Name() string

Name returns the store name (delegates to inner)

func (*ChangeloggingKeyValueStore[K, V]) Persistent

func (c *ChangeloggingKeyValueStore[K, V]) Persistent() bool

Persistent delegates to inner store Matches Kafka Streams' ChangeLoggingKeyValueBytesStore.persistent()

func (*ChangeloggingKeyValueStore[K, V]) Range

func (c *ChangeloggingKeyValueStore[K, V]) Range(ctx context.Context, from, to K) iter.Seq2[K, V]

Range reads from inner store (no logging)

func (*ChangeloggingKeyValueStore[K, V]) Set

func (c *ChangeloggingKeyValueStore[K, V]) Set(ctx context.Context, key K, value V) error

Set writes to store, then logs to changelog Matches: ChangeLoggingKeyValueBytesStore.put()

type KeyValueStore

type KeyValueStore[K comparable, V any] interface {
	StateStore

	// Get retrieves a value by key
	// Returns (value, true, nil) if found
	// Returns (zero, false, nil) if not found
	// Returns (zero, false, err) on error
	Get(ctx context.Context, key K) (V, bool, error)

	// Set stores a key-value pair
	Set(ctx context.Context, key K, value V) error

	// Delete removes a key
	Delete(ctx context.Context, key K) error

	// Range returns an iterator over a range of keys [from, to)
	Range(ctx context.Context, from, to K) iter.Seq2[K, V]

	// All returns an iterator over all keys
	All(ctx context.Context) iter.Seq2[K, V]
}

KeyValueStore is a key-value state store interface Matches Kafka Streams' org.apache.kafka.streams.state.KeyValueStore

func GetKeyValueStore

func GetKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (KeyValueStore[K, V], error)

GetKeyValueStore retrieves a typed KeyValueStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters (common in kstreams architecture).

Example:

func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
    store, err := kstate.GetKeyValueStore[string, int64](ctx, "my-store")
    if err != nil {
        return fmt.Errorf("get store: %w", err)
    }
    p.store = store
    return nil
}

func MustGetKeyValueStore

func MustGetKeyValueStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) KeyValueStore[K, V]

MustGetKeyValueStore retrieves a typed KeyValueStore, panicking on failure. Use this in Init() where failure should be fatal.

Example:

func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
    p.store = kstate.MustGetKeyValueStore[string, int64](ctx, "my-store")
    return nil
}

type KeyValueStoreRef

type KeyValueStoreRef[K comparable, V any] struct {
	// contains filtered or unexported fields
}

KeyValueStoreRef provides type-safe access to a KeyValueStore. Use this as a processor field to declare store dependencies at compile time.

Example:

type MyProcessor struct {
    counter kstate.KeyValueStoreRef[string, int64]
}

func (p *MyProcessor) Init(ctx kprocessor.ProcessorContext[string, int]) error {
    return p.counter.Init(ctx, "counter-store")
}

func (p *MyProcessor) Process(ctx context.Context, key string, value int) error {
    count, _, _ := p.counter.Get(ctx, key)
    return p.counter.Set(ctx, key, count+1)
}

func (*KeyValueStoreRef[K, V]) All

func (r *KeyValueStoreRef[K, V]) All(ctx context.Context) iter.Seq2[K, V]

All iterates over all entries. Panics if not initialized.

func (*KeyValueStoreRef[K, V]) Delete

func (r *KeyValueStoreRef[K, V]) Delete(ctx context.Context, key K) error

Delete removes a key. Panics if not initialized.

func (*KeyValueStoreRef[K, V]) Get

func (r *KeyValueStoreRef[K, V]) Get(ctx context.Context, key K) (V, bool, error)

Get retrieves a value by key. Panics if not initialized.

func (*KeyValueStoreRef[K, V]) Init

func (r *KeyValueStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error

Init initializes the store reference from the processor context. Must be called in the processor's Init method before using the store.

func (*KeyValueStoreRef[K, V]) Name

func (r *KeyValueStoreRef[K, V]) Name() string

Name returns the store name.

func (*KeyValueStoreRef[K, V]) Range

func (r *KeyValueStoreRef[K, V]) Range(ctx context.Context, from, to K) iter.Seq2[K, V]

Range iterates over a key range. Panics if not initialized.

func (*KeyValueStoreRef[K, V]) Set

func (r *KeyValueStoreRef[K, V]) Set(ctx context.Context, key K, value V) error

Set stores a key-value pair. Panics if not initialized.

func (*KeyValueStoreRef[K, V]) Store

func (r *KeyValueStoreRef[K, V]) Store() KeyValueStore[K, V]

Store returns the underlying store (for advanced use cases).

type KeyValueStoreRestoreCallback

type KeyValueStoreRestoreCallback[K comparable, V any] struct {
	// contains filtered or unexported fields
}

KeyValueStoreRestoreCallback is the default restore callback for KeyValueStore Handles deserialization and applies records to the store

Tombstone handling:

  • record.Value == nil → store.Delete(key)
  • record.Value != nil → store.Set(key, value)

func NewKeyValueStoreRestoreCallback

func NewKeyValueStoreRestoreCallback[K comparable, V any](
	store KeyValueStore[K, V],
	keyDecoder func([]byte) (K, error),
	valueDecoder func([]byte) (V, error),
) *KeyValueStoreRestoreCallback[K, V]

NewKeyValueStoreRestoreCallback creates a restore callback for a KeyValueStore

func (*KeyValueStoreRestoreCallback[K, V]) Restore

func (r *KeyValueStoreRestoreCallback[K, V]) Restore(key, value []byte) error

Restore restores a single record

func (*KeyValueStoreRestoreCallback[K, V]) RestoreBatch

func (r *KeyValueStoreRestoreCallback[K, V]) RestoreBatch(records []*kgo.Record) error

RestoreBatch restores a batch of records More efficient than calling Restore() for each record individually

type NoOpRestoreListener

type NoOpRestoreListener struct{}

NoOpRestoreListener is a no-op implementation of StateRestoreListener Used when no monitoring is needed

func (*NoOpRestoreListener) OnBatchRestored

func (n *NoOpRestoreListener) OnBatchRestored(topicPartition checkpoint.TopicPartition, storeName string, batchEndOffset int64, numRestored int64)

func (*NoOpRestoreListener) OnRestoreEnd

func (n *NoOpRestoreListener) OnRestoreEnd(topicPartition checkpoint.TopicPartition, storeName string, totalRestored int64)

func (*NoOpRestoreListener) OnRestoreStart

func (n *NoOpRestoreListener) OnRestoreStart(topicPartition checkpoint.TopicPartition, storeName string, startingOffset, endingOffset int64)

type StateRestoreCallback

type StateRestoreCallback interface {
	// Restore a single record (called if RestoreBatch not implemented)
	// value = nil indicates tombstone (deletion)
	Restore(key, value []byte) error

	// RestoreBatch restores a batch of records for efficiency
	// Implementations should check for tombstones (record.Value == nil)
	RestoreBatch(records []*kgo.Record) error
}

StateRestoreCallback defines how to restore records from a changelog topic into a state store Matches Kafka Streams' org.apache.kafka.streams.processor.StateRestoreCallback

Implementations should handle:

  • Normal records: Apply put(key, value) to store
  • Tombstone records (nil value): Apply delete(key) to store

Two restoration modes:

  • Per-record: Restore() called for each record individually (legacy, slower)
  • Batched: RestoreBatch() called with batches of records (modern, faster)

type StateRestoreListener

type StateRestoreListener interface {
	// OnRestoreStart is called when restoration begins
	// startingOffset: First offset to restore (usually checkpoint offset + 1)
	// endingOffset: EXCLUSIVE ending offset (high water mark, one past the last offset to restore)
	//               Matches Kafka Streams semantics: restoration completes when reaching endingOffset - 1
	OnRestoreStart(topicPartition checkpoint.TopicPartition, storeName string, startingOffset, endingOffset int64)

	// OnBatchRestored is called after each batch is applied to the store
	// batchEndOffset: Last offset in the batch
	// numRestored: Number of records in this batch
	OnBatchRestored(topicPartition checkpoint.TopicPartition, storeName string, batchEndOffset int64, numRestored int64)

	// OnRestoreEnd is called when restoration completes
	// totalRestored: Total number of records restored
	OnRestoreEnd(topicPartition checkpoint.TopicPartition, storeName string, totalRestored int64)
}

StateRestoreListener provides callbacks for monitoring restoration progress Matches Kafka Streams' org.apache.kafka.streams.processor.StateRestoreListener

Used for:

  • Progress monitoring (e.g., "Restored 1000/5000 records")
  • Metrics collection
  • Timeout detection (fail if restoration takes too long)

Note: Must be stateless or provide internal synchronization (called from multiple goroutines)

type StateStore

type StateStore interface {
	// Name returns the store name
	Name() string

	// Init initializes the store with processor context
	Init(ctx kprocessor.ProcessorContextInternal) error

	// Flush persists any cached data
	Flush(ctx context.Context) error

	// Close closes the store
	Close() error

	// Persistent returns true if the store persists data to disk
	// Returns false for in-memory stores
	// CRITICAL: Only persistent stores should be checkpointed
	// Matches Kafka Streams' StateStore.persistent()
	Persistent() bool
}

StateStore is the base interface for all state stores Matches Kafka Streams' org.apache.kafka.streams.processor.StateStore

type StateStoreGetter

type StateStoreGetter interface {
	GetStateStore() StateStore
}

StateStoreGetter is implemented by adapters that wrap a StateStore This allows type-safe extraction of the underlying store without conflicting interface methods (kprocessor.Store vs kstate.StateStore)

type StoreBackend

type StoreBackend interface {
	StateStore
	Set(k, v []byte) error
	Get(k []byte) ([]byte, error)
	Delete(k []byte) error
	Range(lower, upper []byte) iter.Seq2[[]byte, []byte]
	All() iter.Seq2[[]byte, []byte]
}

StoreBackend is the low-level byte-oriented store interface. Implemented by pebble and s3 stores.

type StoreBuilder

type StoreBuilder[T StateStore] interface {
	// WithChangelogEnabled enables changelog topic for this store
	// config contains Kafka topic-level configuration (optional, can be nil)
	//
	// Common config keys:
	//   - "retention.ms": Changelog retention time in milliseconds
	//   - "cleanup.policy": "compact" or "delete"
	//   - "segment.ms": Segment size for time-based retention
	//
	// Default (if config is nil): Uses application-level defaults
	WithChangelogEnabled(config map[string]string) StoreBuilder[T]

	// WithChangelogDisabled disables changelog topic for this store
	// Use for stores that don't need durability (e.g., remote stores like S3, BigTable)
	WithChangelogDisabled() StoreBuilder[T]

	// Build constructs the state store with configured options
	// If changelog is enabled, wraps store in ChangeloggingKeyValueStore
	// Returns error if configuration is invalid or store initialization fails
	Build() (T, error)

	// Name returns the store name
	Name() string

	// LogConfig returns the changelog topic configuration
	// Returns nil if changelog is disabled
	LogConfig() map[string]string

	// ChangelogEnabled returns whether changelog is enabled for this store
	ChangelogEnabled() bool

	// RestoreCallback returns the callback for restoring this store from changelog
	// Returns nil if changelog is disabled
	RestoreCallback() StateRestoreCallback
}

StoreBuilder provides a fluent API for configuring and building state stores Matches Kafka Streams' org.apache.kafka.streams.state.StoreBuilder interface

Key configuration options:

  • Changelog logging (enabled/disabled per store)
  • Changelog topic configuration (retention, cleanup policy, etc.)

Example usage:

builder := pebble.NewKeyValueStoreBuilder[string, User]("user-store", "/tmp/state")
builder.WithChangelogEnabled(map[string]string{
    "retention.ms": "86400000",  // 1 day
    "cleanup.policy": "compact",
})
store := builder.Build()

type TypeErasedStoreBuilder

type TypeErasedStoreBuilder interface {
	// BuildStateStore constructs the state store (type-erased version)
	BuildStateStore() (StateStore, error)

	// Name returns the store name
	Name() string

	// LogConfig returns the changelog topic configuration
	LogConfig() map[string]string

	// ChangelogEnabled returns whether changelog is enabled
	ChangelogEnabled() bool

	// RestoreCallback returns the restore callback
	RestoreCallback() StateRestoreCallback
}

TypeErasedStoreBuilder is a non-generic interface for store builders. This provides type safety when storing builders in maps (avoids using 'any'). All StoreBuilder[T] implementations must also implement this interface.

type WindowedStore

type WindowedStore[K comparable, V any] interface {
	StateStore

	// Set stores a value for a key at a specific timestamp
	Set(ctx context.Context, key K, value V, timestamp int64) error

	// Get retrieves a value by key at a specific timestamp
	Get(ctx context.Context, key K, timestamp int64) (V, bool, error)

	// Fetch retrieves values for a key within a time range
	Fetch(ctx context.Context, key K, timeFrom, timeTo int64) iter.Seq2[K, V]

	// Delete removes a value for a key at a specific timestamp
	Delete(ctx context.Context, key K, timestamp int64) error
}

WindowedStore is a windowed state store interface Matches Kafka Streams' org.apache.kafka.streams.state.WindowStore

func GetWindowedStore

func GetWindowedStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) (WindowedStore[K, V], error)

GetWindowedStore retrieves a typed WindowedStore from the processor context. Returns an error if the store is not found or has the wrong type. Handles stores wrapped in adapters.

func MustGetWindowedStore

func MustGetWindowedStore[K comparable, V any, Kout, Vout any](ctx kprocessor.ProcessorContext[Kout, Vout], name string) WindowedStore[K, V]

MustGetWindowedStore retrieves a typed WindowedStore, panicking on failure.

type WindowedStoreRef

type WindowedStoreRef[K comparable, V any] struct {
	// contains filtered or unexported fields
}

WindowedStoreRef provides type-safe access to a WindowedStore.

func (*WindowedStoreRef[K, V]) Init

func (r *WindowedStoreRef[K, V]) Init(ctx kprocessor.ProcessorContext[any, any], name string) error

Init initializes the windowed store reference from the processor context.

func (*WindowedStoreRef[K, V]) Name

func (r *WindowedStoreRef[K, V]) Name() string

Name returns the store name.

func (*WindowedStoreRef[K, V]) Store

func (r *WindowedStoreRef[K, V]) Store() WindowedStore[K, V]

Store returns the underlying store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL