storage

package
v0.0.0-...-3dcced5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 9 Imported by: 0

README

storage

Pluggable storage backend with layered architecture.

Overview

The storage package provides a flexible, layered storage system for OOO servers:

  • Layered Architecture: Memory + optional persistent layer
  • Event Broadcasting: Notify subscribers of changes
  • Sharded Channels: Per-key ordering for concurrent operations
  • Locking: Path-level locks for atomic operations

Architecture

┌─────────────────────────────────────────────┐
│                  Layered                    │
│  ┌─────────────────────────────────────┐   │
│  │          Memory Layer               │   │ ◀── Fast reads
│  │  (map[string]meta.Object + RWMutex) │   │
│  └─────────────────────────────────────┘   │
│                    │                        │
│                    │ write-through          │
│                    ▼                        │
│  ┌─────────────────────────────────────┐   │
│  │       Persistent Layer (optional)   │   │ ◀── Durable writes
│  │  (ko, badger, sqlite, etc.)         │   │
│  └─────────────────────────────────────┘   │
└─────────────────────────────────────────────┘
                    │
                    │ Events
                    ▼
            ┌───────────────┐
            │ ShardedChan   │ ◀── Per-key ordering
            └───────────────┘

Usage

Memory-Only Storage
storage := storage.New(storage.LayeredConfig{
    Memory: storage.NewMemoryLayer(),
})
With Persistent Layer
storage := storage.New(storage.LayeredConfig{
    Memory:     storage.NewMemoryLayer(),
    Persistent: myPersistentAdapter,
})
Basic Operations
// Set
index, err := store.Set("users/123", jsonData)

// Get
obj, err := store.Get("users/123")

// Get list (glob)
objs, err := store.GetN("users/*", 0, 100)

// Delete
err := store.Del("users/123")
With Options
storage := storage.New(storage.LayeredConfig{
    Memory: storage.NewMemoryLayer(),
}, storage.Options{
    AfterWrite: func(key string) {
        log.Printf("Written: %s", key)
    },
    BeforeRead: func(key string) {
        log.Printf("Reading: %s", key)
    },
})
Watching Changes
store.Watch(func(event storage.Event) {
    switch event.Operation {
    case "set":
        fmt.Printf("Set: %s\n", event.Key)
    case "del":
        fmt.Printf("Deleted: %s\n", event.Key)
    }
})
Locking
err := store.Lock("users/123")
defer store.Unlock("users/123")

// Atomic operations on users/123

Types

Type Description
Layered Main storage implementation
LayeredConfig Configuration with memory + persistent layers
Options Callbacks and settings
Event Storage change event
ShardedChan Per-key ordered event channels
MemoryLayer In-memory storage layer
Adapter Interface for persistent backends

Event Structure

type Event struct {
    Key       string       // The affected key
    Operation string       // "set" or "del"
    Object    *meta.Object // The object (nil for delete)
}

Errors

Error Description
ErrNotFound Key does not exist
ErrInvalidPath Invalid key format
ErrGlobNotAllowed Glob used where not allowed
ErrGlobRequired Glob required but not provided
ErrNotActive Storage not started

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound           = errors.New("storage: not found")
	ErrInvalidPath        = errors.New("storage: invalid path")
	ErrInvalidPattern     = errors.New("storage: invalid pattern, glob required")
	ErrInvalidRange       = errors.New("storage: invalid range")
	ErrInvalidLimit       = errors.New("storage: invalid limit")
	ErrInvalidStorageData = errors.New("storage: invalid data (empty)")
	ErrGlobNotAllowed     = errors.New("storage: glob pattern not allowed for this operation")
	ErrGlobRequired       = errors.New("storage: glob pattern required for this operation")
	ErrCantLockGlob       = errors.New("storage: can't lock a glob pattern path")
	ErrLockNotFound       = errors.New("storage: lock not found, can't unlock")
	ErrNoop               = errors.New("storage: no operation performed")
	ErrAllLayersNil       = errors.New("storage: all storage layers are nil")
	ErrNotActive          = errors.New("storage: storage is not active")
)

Errors

Functions

func WatchStorageNoop

func WatchStorageNoop(dataStore Database)

WatchStorageNoop drains all events from a storage's sharded watcher channels. Use this for extra storages that are not directly hooked to a server.

func WatchWithCallback

func WatchWithCallback(dataStore Database, callback func(Event))

WatchWithCallback starts goroutines that watch all sharded channels and call the provided callback for each event. Use this for external storages that need to trigger sync on storage events.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter wraps a storage.Database to provide compatibility with ooo.Database interface This allows the new storage package to be used with the existing ooo server

func NewAdapter

func NewAdapter(db Database) *Adapter

NewAdapter creates a new adapter wrapping a storage.Database

func (*Adapter) Active

func (a *Adapter) Active() bool

Active returns whether the storage is active

func (*Adapter) Clear

func (a *Adapter) Clear()

Clear removes all data

func (*Adapter) Close

func (a *Adapter) Close()

Close shuts down the storage

func (*Adapter) Del

func (a *Adapter) Del(key string) error

Del deletes a key

func (*Adapter) DelSilent

func (a *Adapter) DelSilent(key string) error

DelSilent deletes a key without broadcasting

func (*Adapter) Get

func (a *Adapter) Get(key string) (meta.Object, error)

Get retrieves a single value

func (*Adapter) GetAndLock

func (a *Adapter) GetAndLock(key string) (meta.Object, error)

GetAndLock retrieves a value and locks the key

func (*Adapter) GetList

func (a *Adapter) GetList(path string) ([]meta.Object, error)

GetList retrieves values matching a pattern

func (*Adapter) GetListDescending

func (a *Adapter) GetListDescending(path string) ([]meta.Object, error)

GetListDescending retrieves values matching a pattern in descending order

func (*Adapter) GetN

func (a *Adapter) GetN(path string, limit int) ([]meta.Object, error)

GetN retrieves N values matching a pattern

func (*Adapter) GetNAscending

func (a *Adapter) GetNAscending(path string, limit int) ([]meta.Object, error)

GetNAscending retrieves N values matching a pattern in ascending order

func (*Adapter) GetNRange

func (a *Adapter) GetNRange(path string, limit int, from, to int64) ([]meta.Object, error)

GetNRange retrieves N values in a time range

func (*Adapter) Keys

func (a *Adapter) Keys() ([]string, error)

Keys returns all keys

func (*Adapter) KeysRange

func (a *Adapter) KeysRange(path string, from, to int64) ([]string, error)

KeysRange returns keys in a time range

func (*Adapter) Push

func (a *Adapter) Push(path string, data json.RawMessage) (string, error)

Push stores data under a new key

func (*Adapter) Set

func (a *Adapter) Set(key string, data json.RawMessage) (string, error)

Set stores a value

func (*Adapter) SetAndUnlock

func (a *Adapter) SetAndUnlock(key string, data json.RawMessage) (string, error)

SetAndUnlock stores a value and unlocks the key

func (*Adapter) SetWithMeta

func (a *Adapter) SetWithMeta(key string, data json.RawMessage, created, updated int64) (string, error)

SetWithMeta stores a value with metadata

func (*Adapter) Start

func (a *Adapter) Start(opt Options) error

Start initializes the storage

func (*Adapter) Underlying

func (a *Adapter) Underlying() Database

Underlying returns the underlying database

func (*Adapter) Unlock

func (a *Adapter) Unlock(key string) error

Unlock unlocks a key

func (*Adapter) WatchSharded

func (a *Adapter) WatchSharded() *ShardedChan

WatchSharded returns the sharded storage channel

type Database

type Database interface {
	Active() bool
	Start(Options) error
	Close()
	Keys() ([]string, error)
	KeysRange(path string, from, to int64) ([]string, error)
	Get(key string) (meta.Object, error)
	GetAndLock(key string) (meta.Object, error)
	GetList(path string) ([]meta.Object, error)
	GetListDescending(path string) ([]meta.Object, error)
	GetN(path string, limit int) ([]meta.Object, error)
	GetNAscending(path string, limit int) ([]meta.Object, error)
	GetNRange(path string, limit int, from, to int64) ([]meta.Object, error)
	Set(key string, data json.RawMessage) (string, error)
	Push(path string, data json.RawMessage) (string, error)
	SetWithMeta(key string, data json.RawMessage, created, updated int64) (string, error)
	SetAndUnlock(key string, data json.RawMessage) (string, error)
	Unlock(key string) error
	Del(key string) error
	DelSilent(key string) error
	Clear()
	WatchSharded() *ShardedChan
}

Database interface to be implemented by storages This is the full interface that the layered storage implements

type EmbeddedLayer

type EmbeddedLayer interface {
	Layer
	// Load reads all data from persistent storage (called during initialization)
	Load() (map[string]*meta.Object, error)
}

EmbeddedLayer is the interface for embedded storage backends (leveldb, etc.) Implementations should handle their own persistence

type EmbeddedWrapper

type EmbeddedWrapper struct {
	// contains filtered or unexported fields
}

EmbeddedWrapper wraps an EmbeddedLayer to provide Layer interface with caching disabled This is used when the embedded layer is the only layer or when caching is handled elsewhere

func NewEmbeddedWrapper

func NewEmbeddedWrapper(backend EmbeddedLayer) *EmbeddedWrapper

NewEmbeddedWrapper creates a wrapper around an embedded layer

func (*EmbeddedWrapper) Active

func (e *EmbeddedWrapper) Active() bool

Active returns whether the layer is active

func (*EmbeddedWrapper) Clear

func (e *EmbeddedWrapper) Clear()

Clear removes all data

func (*EmbeddedWrapper) Close

func (e *EmbeddedWrapper) Close()

Close shuts down the embedded layer

func (*EmbeddedWrapper) Del

func (e *EmbeddedWrapper) Del(key string) error

Del deletes a key

func (*EmbeddedWrapper) Get

func (e *EmbeddedWrapper) Get(key string) (meta.Object, error)

Get retrieves a single value by exact key

func (*EmbeddedWrapper) GetList

func (e *EmbeddedWrapper) GetList(path string) ([]meta.Object, error)

GetList retrieves all values matching a glob pattern

func (*EmbeddedWrapper) Keys

func (e *EmbeddedWrapper) Keys() ([]string, error)

Keys returns all keys

func (*EmbeddedWrapper) Load

func (e *EmbeddedWrapper) Load() (map[string]*meta.Object, error)

Load reads all data from persistent storage

func (*EmbeddedWrapper) Set

func (e *EmbeddedWrapper) Set(key string, obj *meta.Object) error

Set stores a value

func (*EmbeddedWrapper) Start

func (e *EmbeddedWrapper) Start(opt LayerOptions) error

Start initializes the embedded layer

type Event

type Event struct {
	Key       string
	Operation string
	Object    *meta.Object
}

Event an operation event

type EventCallback

type EventCallback func(event Event)

EventCallback is a callback type for storage events

type Layer

type Layer interface {
	// Active returns whether the layer is active
	Active() bool
	// Start initializes the layer
	Start(opt LayerOptions) error
	// Close shuts down the layer
	Close()
	// Get retrieves a single value by exact key
	Get(key string) (meta.Object, error)
	// GetList retrieves all values matching a glob pattern
	GetList(path string) ([]meta.Object, error)
	// Set stores a value
	Set(key string, obj *meta.Object) error
	// Del deletes a key
	Del(key string) error
	// Keys returns all keys
	Keys() ([]string, error)
	// Clear removes all data
	Clear()
}

Layer is the interface for individual storage layers (memory, embedded, sql) This is a simpler interface focused on raw storage operations

type LayerOptions

type LayerOptions struct {
	// SkipLoadMemory when true, skips loading data from embedded layer into memory on startup
	SkipLoadMemory bool
}

LayerOptions configuration for individual storage layers

type Layered

type Layered struct {
	// contains filtered or unexported fields
}

Layered is a multi-layer storage that coordinates between memory and embedded layers

func New

func New(cfg LayeredConfig) *Layered

NewLayered creates a new layered storage

func (*Layered) Active

func (l *Layered) Active() bool

Active returns whether the storage is active

func (*Layered) Clear

func (l *Layered) Clear()

Clear removes all data from all layers

func (*Layered) Close

func (l *Layered) Close()

Close shuts down all layers

func (*Layered) Del

func (l *Layered) Del(path string) error

Del deletes a key from all layers

func (*Layered) DelSilent

func (l *Layered) DelSilent(path string) error

DelSilent deletes a key from all layers without broadcasting

func (*Layered) Get

func (l *Layered) Get(path string) (meta.Object, error)

Get retrieves a single value by exact key Checks layers from fastest to slowest, populates faster layers on cache miss

func (*Layered) GetAndLock

func (l *Layered) GetAndLock(path string) (meta.Object, error)

GetAndLock retrieves a single value and locks the key mutex

func (*Layered) GetList

func (l *Layered) GetList(path string) ([]meta.Object, error)

GetList retrieves list of values matching a glob pattern (ascending order)

func (*Layered) GetListDescending

func (l *Layered) GetListDescending(path string) ([]meta.Object, error)

GetListDescending retrieves list of values matching a glob pattern (descending order)

func (*Layered) GetN

func (l *Layered) GetN(path string, limit int) ([]meta.Object, error)

GetN get last N elements of a path related value(s)

func (*Layered) GetNAscending

func (l *Layered) GetNAscending(path string, limit int) ([]meta.Object, error)

GetNAscending get first N elements of a path related value(s)

func (*Layered) GetNRange

func (l *Layered) GetNRange(path string, limit int, from, to int64) ([]meta.Object, error)

GetNRange get last N elements in a time range

func (*Layered) Keys

func (l *Layered) Keys() ([]string, error)

Keys returns all keys from all layers

func (*Layered) KeysRange

func (l *Layered) KeysRange(path string, from, to int64) ([]string, error)

KeysRange list keys in a path and time range

func (*Layered) Push

func (l *Layered) Push(path string, data json.RawMessage) (string, error)

Push stores data under a new key generated from a glob pattern path

func (*Layered) Set

func (l *Layered) Set(path string, data json.RawMessage) (string, error)

Set stores a value in all layers

func (*Layered) SetAndUnlock

func (l *Layered) SetAndUnlock(path string, data json.RawMessage) (string, error)

SetAndUnlock sets a value and unlocks the key mutex

func (*Layered) SetWithMeta

func (l *Layered) SetWithMeta(path string, data json.RawMessage, created, updated int64) (string, error)

SetWithMeta set entries with metadata created/updated values

func (*Layered) Start

func (l *Layered) Start(opt Options) error

Start initializes all layers and populates caches

func (*Layered) Unlock

func (l *Layered) Unlock(path string) error

Unlock unlocks a key mutex

func (*Layered) WatchSharded

func (l *Layered) WatchSharded() *ShardedChan

WatchSharded returns the sharded storage channel

type LayeredConfig

type LayeredConfig struct {
	// Memory layer (fastest) - optional
	Memory *MemoryLayer
	// MemoryOptions configuration for memory layer
	MemoryOptions LayerOptions

	// Embedded layer (medium speed) - optional
	Embedded EmbeddedLayer
	// EmbeddedOptions configuration for embedded layer
	EmbeddedOptions LayerOptions
}

LayeredConfig configuration for the layered storage

type MemoryLayer

type MemoryLayer struct {
	// contains filtered or unexported fields
}

MemoryLayer is an in-memory storage layer

func NewMemoryLayer

func NewMemoryLayer() *MemoryLayer

NewMemoryLayer creates a new memory layer

func (*MemoryLayer) Active

func (m *MemoryLayer) Active() bool

Active returns whether the layer is active

func (*MemoryLayer) Clear

func (m *MemoryLayer) Clear()

Clear removes all data

func (*MemoryLayer) Close

func (m *MemoryLayer) Close()

Close shuts down the memory layer

func (*MemoryLayer) Del

func (m *MemoryLayer) Del(k string) error

Del deletes a key

func (*MemoryLayer) Get

func (m *MemoryLayer) Get(k string) (meta.Object, error)

Get retrieves a single value by exact key

func (*MemoryLayer) GetList

func (m *MemoryLayer) GetList(path string) ([]meta.Object, error)

GetList retrieves all values matching a glob pattern

func (*MemoryLayer) Keys

func (m *MemoryLayer) Keys() ([]string, error)

Keys returns all keys

func (*MemoryLayer) Len

func (m *MemoryLayer) Len() int

Len returns the number of entries (for testing)

func (*MemoryLayer) Set

func (m *MemoryLayer) Set(k string, obj *meta.Object) error

Set stores a value

func (*MemoryLayer) Start

func (m *MemoryLayer) Start(opt LayerOptions) error

Start initializes the memory layer

type Options

type Options struct {
	NoBroadcastKeys []string
	BeforeRead      func(key string)
	AfterWrite      func(key string)
	Workers         int
}

Options for the storage instance

type ShardedChan

type ShardedChan struct {
	// contains filtered or unexported fields
}

ShardedChan manages multiple channels for per-key ordering.

func NewShardedChan

func NewShardedChan(shardCount int) *ShardedChan

NewShardedChan creates a new sharded storage channel with the given number of shards.

func (*ShardedChan) Close

func (s *ShardedChan) Close()

Close closes all shard channels.

func (*ShardedChan) Count

func (s *ShardedChan) Count() int

Count returns the number of shards.

func (*ShardedChan) Send

func (s *ShardedChan) Send(event Event)

Send routes an event to the appropriate shard based on key hash.

func (*ShardedChan) Shard

func (s *ShardedChan) Shard(index int) StorageChan

Shard returns the channel for a specific shard index.

func (*ShardedChan) ShardFor

func (s *ShardedChan) ShardFor(key string) int

ShardFor returns the shard index for a given key using FNV-1a hash.

type StorageChan

type StorageChan chan Event

StorageChan an operation events channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL