db

package
v0.1.41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 14 Imported by: 0

README

db Package

The db package is the persistence layer for the Migration Engine. It owns the DuckDB file, schema, and read/write paths used by pkg/queue and pkg/migration.

Traversal and copy current status for each node are derived from append-only src_status_events / dst_status_events (latest event per id via arg_max), not from columns on src_nodes / dst_nodes (those tables store path, depth, type, size, etc.).

Seal: SealLevel bulk-appends node rows and writes per-depth stats; when Options.SealBuffer is set, seal payloads are flushed asynchronously (seal_buffer.go).


Overview

  • db.go – Open/close, schema init, SealLevel, AddNodeDeletions, checkpoint, RunWrite / WriteSession, optional SealBuffer.
  • seal_buffer.go – Buffers seal jobs; flush on interval, thresholds, Stop/Flush.
  • writer.go – Transactional Writer: status events, appender inserts, stats, review deltas, subtree ops, logs, queue_stats, task_errors.
  • schema.go – DDL: node tables, status event tables, stats (universal key/count, e.g. review aggregates), migrations (lifecycle row per migration when using manager), src_stats/dst_stats, logs, queue_stats, task_errors.
  • queries.go – Read helpers: nodes, keyset lists, merged review, children, counts.
  • stats.go – Review snapshot, stats keys, copy/traversal counts from events.
  • constants.go, nodestate.go, logs.go, seeding.go, indexes.go – Supporting types and indexes (including status-event indexes).

Tables (high level)

Table Purpose
src_nodes, dst_nodes Node metadata (path, depth, type, size, …). Status from events.
src_status_events, dst_status_events Append-only status history; SRC includes copy_status.
src_stats, dst_stats Per-depth keyed counts (traversal/copy buckets).
stats Global key → count (canonical review counters, etc.).
migrations One row per logical migration (id, name, phase, JSON blobs) when using MigrationManager.
logs Buffered log rows.
queue_stats Queue metrics JSON.
task_errors Task error records.

Write paths

  1. Seal – Queue completes a round/level → SealLevel (sync or via SealBuffer flush).
  2. TransactionalRunWrite(ctx, fn)WithTx(Writer) for status events, review mutations, deletes, logs, etc.

Read paths

  • GetDB() / GetDBForPulls(queueType) – Single connection (MaxOpenConns(1)).
  • Keyset pulls, GetNodeByID, merged review queries, stats readers—see queries.go / stats.go.

Concurrency

  • writeMu serializes writes; checkpoint uses checkpointMu where applicable.

File layout

pkg/db/
├── db.go           # Open, schema, SealLevel, RunWrite, SealBuffer hookup
├── seal_buffer.go
├── writer.go
├── schema.go
├── queries.go
├── stats.go
├── constants.go
├── nodestate.go
├── logs.go
├── seeding.go
├── indexes.go
├── developer_note.md
└── README.md

Integration

  • pkg/queue – Pulls via queries; persists via SealLevel and writers.
  • pkg/migrationMigrationManager / domain Migration; review and lifecycle use the same *db.DB.

Summary

  • Single DuckDB connection for operational queries.
  • Status = latest row per node in status event tables.
  • Seal + SealBuffer for bulk level writes; stats holds universal review aggregates.

Documentation

Index

Constants

View Source
const (
	StatusPending            = "pending"
	StatusSuccessful         = "successful"
	StatusFailed             = "failed"
	StatusNotOnSrc           = "not_on_src" // DST only
	StatusExcluded           = "excluded"
	StatusExclusionInherited = "exclusion_inherited" // bulk subtree exclusion
)

Traversal status values (event-derived). Stats keys use: pending, successful, failed, not_on_src (DST only).

View Source
const (
	CopyStatusPending           = "pending"
	CopyStatusInProgress        = "in_progress" // transient; not stored in stats
	CopyStatusSuccessful        = "successful"
	CopyStatusFailed            = "failed"
	CopyStatusSkipped           = "skipped"
	CopyStatusExcludedExplicit  = "excluded_explicit"  // user excluded this node from copy
	CopyStatusExcludedInherited = "excluded_inherited" // excluded by parent folder
	CopyStatusExcluded          = "excluded"           // display value for UI (both explicit and inherited)
)

Copy status values (src_nodes only). Stats keys use: pending, successful, failed (no in_progress in stats).

View Source
const (
	NodeTypeFolder = "folder"
	NodeTypeFile   = "file"
)

Node type values.

View Source
const (

	// TableMigrations is the migrations lifecycle table name (used by schema and migration store).
	TableMigrations = "migrations"
	// TableMigrationEnvelope holds one row per migration DB: 32-byte envelope master key for Sylos-FS credentials.
	TableMigrationEnvelope = "migration_envelope"
	// TableFSCredentialBinding holds per-side connection id, optional creds file path, and serialized root folder for API rehydration.
	TableFSCredentialBinding = "fs_credential_binding"
)
View Source
const (
	ReviewKeyTraversalPending      = "traversal/pending"
	ReviewKeyTraversalPendingRetry = "traversal/pending_retry"
	ReviewKeyTraversalSuccessful   = "traversal/successful"
	ReviewKeyTraversalFailed       = "traversal/failed"
	ReviewKeyCopyPending           = "copy/pending"
	ReviewKeyCopySuccessful        = "copy/successful"
	ReviewKeyCopyFailed            = "copy/failed"
	ReviewKeyExcluded              = "excluded"
	ReviewKeyFolders               = "folders"
	ReviewKeyFiles                 = "files"
	ReviewKeySizeSrc               = "size_src"
	ReviewKeySizeDst               = "size_dst"
)

Universal stats table keys for canonical review stats (tableStats). Namespaced as traversal/*, copy/*, and flat aggregates.

View Source
const StatsKeyCompleted = "completed"

StatsKeyCompleted is the stats key for completed count at a depth (written at seal).

View Source
const StatsKeyExpected = "expected"

StatsKeyExpected is the stats key for expected count at a depth (set at round start).

Variables

This section is empty.

Functions

func BatchGetChildrenIDsByParentIDs

func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)

BatchGetChildrenIDsByParentIDs returns map[parentID][]childID for the given table and parent ids.

func BatchGetDstIDsFromSrcIDs

func BatchGetDstIDsFromSrcIDs(d *DB, srcIDs []string) (map[string]string, error)

BatchGetDstIDsFromSrcIDs returns map[srcID]dstID by resolving SRC id->path then DST path->id in two queries (no per-item reads).

func BatchGetNodeMeta

func BatchGetNodeMeta(d *DB, table string, ids []string) (map[string]NodeMeta, error)

BatchGetNodeMeta returns meta (id, depth, type, traversal_status, copy_status) for the given ids. Status from latest events.

func BatchGetNodesByID

func BatchGetNodesByID(d *DB, table string, ids []string) (map[string]*NodeState, error)

BatchGetNodesByID returns nodes by id for the given table in one query. Status from latest events.

func BatchInsertNodes

func BatchInsertNodes(d *DB, ops []InsertOperation) error

BatchInsertNodes inserts nodes via the writer.

func CopyStatusForDisplay added in v0.1.41

func CopyStatusForDisplay(status string) string

CopyStatusForDisplay returns the copy_status to show in the UI. Internal excluded_explicit and excluded_inherited both become "excluded".

func CountExcluded

func CountExcluded(d *DB, table string) (int, error)

CountExcluded returns the number of SRC nodes with copy_status in (excluded_explicit, excluded_inherited). Exclusion is SRC-only; returns 0 for DST.

func CountExcludedInSubtree

func CountExcludedInSubtree(d *DB, table, rootPath string) (int, error)

CountExcludedInSubtree returns the number of SRC nodes in the subtree with copy_status in (excluded_explicit, excluded_inherited). Exclusion is SRC-only; returns 0 for DST.

func CountMergedReviewRows added in v0.1.41

func CountMergedReviewRows(d *DB, f ReviewFilter) (int, error)

CountMergedReviewRows returns the number of merged rows matching the filter.

func CountNodes

func CountNodes(d *DB, table string) (int, error)

CountNodes returns the total number of nodes in the given table (src_nodes or dst_nodes). Live table count, not from stats. Uses pull conn so we see appender-written data.

func DeterministicNodeID

func DeterministicNodeID(queueType, nodeType, path string) string

DeterministicNodeID returns a stable id from (queueType, nodeType, path) for race-safe deduplication.

func DropNodeTableIndexes added in v0.1.41

func DropNodeTableIndexes(db *DB, table string) error

DropNodeTableIndexes drops the non-primary indexes on the given node table (e.g. "src_nodes", "dst_nodes"). Call before a bulk phase to avoid index maintenance cost during inserts.

func DropStatusEventTableIndexes added in v0.1.41

func DropStatusEventTableIndexes(db *DB, table string) error

DropStatusEventTableIndexes drops indexes on the given status event table. Call before a bulk phase.

func DstStatusEventAppendRowArgs added in v0.1.41

func DstStatusEventAppendRowArgs(e *StatusEvent) []any

DstStatusEventAppendRowArgs returns column values for one row in dst_status_events (id, traversal_status, event_time, depth) for appender.

func EnsureNodeTableIndexes

func EnsureNodeTableIndexes(db *DB, table string) error

EnsureNodeTableIndexes creates stable lookup indexes on the given node table (e.g. "src_nodes", "dst_nodes") for path_hash, parent_path_hash, and depth.

Idempotent for create/drop operations.

func EnsureStatusEventTableIndexes added in v0.1.41

func EnsureStatusEventTableIndexes(db *DB, table string) error

EnsureStatusEventTableIndexes creates indexes on the given status event table for id and (id, event_time) to support "latest event per node" queries. Idempotent.

func GenerateLogID

func GenerateLogID() string

GenerateLogID returns a unique log id (UUID).

func GetAllLevels

func GetAllLevels(d *DB, table string) ([]int, error)

GetAllLevels returns distinct depth values for the table, sorted.

func GetChildrenIDsByParentID

func GetChildrenIDsByParentID(d *DB, table, parentID string, limit int) ([]string, error)

GetChildrenIDsByParentID returns child ids for the given parent_id (up to limit).

func GetDstIDFromSrcID

func GetDstIDFromSrcID(d *DB, srcParentID string) (string, error)

GetDstIDFromSrcID returns the DST node id that has the same path as the given SRC node id (join by path).

func GetDstIDToSrcPath

func GetDstIDToSrcPath(d *DB, dstIDs []string) (map[string]string, error)

GetDstIDToSrcPath returns for each DST id the path (which is the join key = "src path").

func GetSrcChildrenGroupedByParentPath

func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)

GetSrcChildrenGroupedByParentPath returns SRC nodes grouped by parent_path. Status from latest events.

func GetTotalFileSizes added in v0.1.41

func GetTotalFileSizes(d *DB) (srcTotal, dstTotal int64, err error)

GetTotalFileSizes returns the sum of size across src_nodes and dst_nodes (for API totalFileSize).

func InsertRootNode

func InsertRootNode(d *DB, table string, state *NodeState) error

InsertRootNode inserts the root node (path "/", depth 0) as metadata only, then emits initial status event(s).

func MergedReviewQueryBase added in v0.1.41

func MergedReviewQueryBase() string

MergedReviewQueryBase returns the WITH clause for the merged src+dst review view (status from events). Use with " SELECT ... FROM merged" + where. merged includes parent_path_hash for indexed child lookups.

func NodeStateAppendRowArgs

func NodeStateAppendRowArgs(n *NodeState) []any

NodeStateAppendRowArgs returns the column values for one NodeState (metadata only) in table order for use with duckdb.Appender.AppendRow.

func NormalizeRootRelativePath added in v0.1.41

func NormalizeRootRelativePath(path string) string

NormalizeRootRelativePath returns a root-relative path with no "//" so SRC/DST path_hash joins match. Root is "/"; children are "/name", "/name/child". Collapses any "//" to "/".

func NormalizeSubtreeRootPathForPropagation added in v0.1.41

func NormalizeSubtreeRootPathForPropagation(path string) string

NormalizeSubtreeRootPathForPropagation normalizes a failed folder path so subtree updates match src_nodes.path (root-relative slashes, trim trailing slash except "/").

func PathHash added in v0.1.41

func PathHash(path string) string

PathHash returns a deterministic 32-char hex hash of path for use as an index key.

func SrcStatusEventAppendRowArgs added in v0.1.41

func SrcStatusEventAppendRowArgs(e *StatusEvent) []any

SrcStatusEventAppendRowArgs returns column values for one row in src_status_events (id, traversal_status, copy_status, event_time, depth) for appender.

func StatsKeyCopyStatus

func StatsKeyCopyStatus(status string) string

StatsKeyCopyStatus returns the src_stats key for copy status (src_nodes only). Valid statuses: pending, successful, failed.

func StatsKeyTraversalStatus

func StatsKeyTraversalStatus(status string) string

StatsKeyTraversalStatus returns the src_stats/dst_stats key for traversal status. Valid statuses: pending, successful, failed, not_on_src (DST only).

Types

type BatchInsertOperation

type BatchInsertOperation struct {
	Operations []InsertOperation
}

BatchInsertOperation is a batch of node inserts.

type CopyStatusBucketsSubtreeNotExcluded added in v0.1.41

type CopyStatusBucketsSubtreeNotExcluded struct {
	Pending    int64
	Failed     int64
	Successful int64
	Skipped    int64
	InProgress int64
}

CopyStatusBucketsSubtreeNotExcluded holds counts of current SRC copy_status among nodes in the subtree that are not yet excluded (exclude propagation runs on this set).

type CopyStatusCounts added in v0.1.41

type CopyStatusCounts struct {
	Pending    int64
	Successful int64
	Failed     int64
	Skipped    int64
	Excluded   int64 // excluded_explicit + excluded_inherited
}

CopyStatusCounts holds copy status counts for SRC (from src_status_events).

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is the DuckDB-backed database handle. Single physical connection for all DB operations (schema, bulk append at seal, pulls, checkpoint).

func Open

func Open(opts Options) (*DB, error)

Open opens a DuckDB database at the given path and creates schema if missing.

func (*DB) AddNodeDeletion

func (db *DB) AddNodeDeletion(table, nodeID string) error

AddNodeDeletion deletes a node immediately (retry DST cleanup).

func (*DB) AddNodeDeletions

func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error

AddNodeDeletions deletes multiple nodes.

func (*DB) AddSealNodes added in v0.1.41

func (db *DB) AddSealNodes(table string, depth int, nodes []*NodeState) error

AddSealNodes writes a subset of nodes and their status events to the seal buffer (no level stats). Use when removing nodes from cache after DST task completion or SRC early completion; only remove from cache if this returns nil.

func (*DB) AppendDiscoveredNodes added in v0.1.41

func (db *DB) AppendDiscoveredNodes(ops []InsertOperation)

AppendDiscoveredNodes adds discovered nodes (and their initial status events) to the seal buffer discovery queue. Call from traversal completion; flush is async until FlushAppenderBuffer.

func (*DB) AppendFailedSubtree added in v0.1.41

func (db *DB) AppendFailedSubtree(parentPath string)

AppendFailedSubtree enqueues an SRC folder path for subtree failure propagation. At the next flush, all pending descendants will be marked as copy_status='failed'.

func (*DB) AppendStatusEvent added in v0.1.41

func (db *DB) AppendStatusEvent(table string, e StatusEvent, fromRetry bool)

AppendStatusEvent adds a status event (e.g. completed/failed) to the seal buffer discovery queue. Call from CompleteTraversalTask / FailTraversalTask. fromRetry should be true when the completion is from retry mode so we decrement PendingRetry (not Pending) when the path zeros.

func (*DB) AppendTaskError added in v0.1.41

func (db *DB) AppendTaskError(queueType, phase, nodeID, message string, attempts int, path string)

AppendTaskError adds a task error to the seal buffer for async flush. Call from workers on failure (replaces immediate RecordTaskError).

func (*DB) BeginCopyPhase added in v0.1.41

func (db *DB) BeginCopyPhase(ctx context.Context) error

BeginCopyPhase starts the copy phase (same as traversal: drop indexes, persistent appenders).

func (*DB) BeginTraversalPhase added in v0.1.41

func (db *DB) BeginTraversalPhase(ctx context.Context) error

BeginTraversalPhase starts the traversal phase: drops secondary indexes, acquires a persistent connection and creates 4 appenders for the seal buffer. Flush will use one tx per flush until EndTraversalPhase.

func (*DB) Checkpoint

func (db *DB) Checkpoint() error

Checkpoint runs CHECKPOINT on the main conn, guarded by checkpointMu. Call at root seeding and round advancement only. Uses a single connection; running CHECKPOINT on multiple connections causes "Could not remove file X.wal: No such file or directory".

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection. Stops the seal buffer first (flushing any pending seal and discovery jobs).

func (*DB) EndCopyPhase added in v0.1.41

func (db *DB) EndCopyPhase() error

EndCopyPhase ends the copy phase (flush, close appenders, rebuild indexes, checkpoint).

func (*DB) EndTraversalPhase added in v0.1.41

func (db *DB) EndTraversalPhase() error

EndTraversalPhase flushes remaining seal jobs, closes phase appenders, checkpoints to flush WAL and free memory, then rebuilds indexes.

func (*DB) FlushAppenderBuffer added in v0.1.41

func (db *DB) FlushAppenderBuffer() error

FlushAppenderBuffer flushes the seal buffer (including discovery queue) and returns when all pending nodes and status events are persisted. Call before round advance.

func (*DB) FlushSealBuffer

func (db *DB) FlushSealBuffer() error

FlushSealBuffer drains pending seal jobs to the DB. Call before backpressure wait so we don't block on the flush interval.

func (*DB) GetAllQueueStats

func (db *DB) GetAllQueueStats() (map[string][]byte, error)

GetAllQueueStats returns all queue stats from queue_stats table.

func (*DB) GetCopyCountAtDepth

func (db *DB) GetCopyCountAtDepth(depth int, nodeType string, copyStatus string, breakAtFirst bool) (int64, error)

GetCopyCountAtDepth returns the count of nodes in src_nodes at the given depth with current copy_status (event-derived). Optional nodeType filter. If breakAtFirst is true, returns 1 if any matching node exists or 0 otherwise.

func (*DB) GetCopyStatusCountsFromEvents added in v0.1.41

func (db *DB) GetCopyStatusCountsFromEvents() (CopyStatusCounts, error)

GetCopyStatusCountsFromEvents returns counts of SRC nodes by current copy_status from src_status_events (arg_max per id). Use for review/API counts.

func (*DB) GetDB

func (db *DB) GetDB() (*sql.DB, error)

GetDB returns the underlying *sql.DB for read-only queries (main conn).

func (*DB) GetDBForPulls

func (db *DB) GetDBForPulls(queueType string) (*sql.DB, error)

GetDBForPulls returns the main connection for pull queries. queueType is "SRC" or "DST" (both use same conn).

func (*DB) GetMaxDepth

func (db *DB) GetMaxDepth(table string) (int, error)

GetMaxDepth returns the maximum depth present in the nodes table for the given table ("SRC" or "DST"). Used as stop condition for retry sweep.

func (*DB) GetPathReviewStatsFromDB added in v0.1.41

func (db *DB) GetPathReviewStatsFromDB() (ReviewStatsSnapshot, error)

GetPathReviewStatsFromDB computes path review stats from nodes and status events only (no stats table). Use for GetPathReviewStats() and verification.

func (*DB) GetPendingTraversalCountAtDepthFromLive

func (db *DB) GetPendingTraversalCountAtDepthFromLive(table string, depth int) (int64, error)

GetPendingTraversalCountAtDepthFromLive returns the count of nodes at the given depth with current traversal_status = 'pending' (event-derived).

func (*DB) GetQueueStats

func (db *DB) GetQueueStats(queueKey string) ([]byte, error)

GetQueueStats returns the metrics JSON for the queue key from queue_stats table.

func (*DB) GetReviewStatsSnapshot added in v0.1.41

func (db *DB) GetReviewStatsSnapshot() (ReviewStatsSnapshot, error)

GetReviewStatsSnapshot reads the full canonical review stats from the universal stats table.

func (*DB) GetStatsBreakdown

func (db *DB) GetStatsBreakdown(table string) ([]StatsRow, error)

GetStatsBreakdown returns (depth, key, count) from live nodes+events grouped by depth and traversal_status. Order: depth, key.

func (*DB) GetStatsCount

func (db *DB) GetStatsCount(table, key string) (int64, error)

GetStatsCount returns the total count for the given key from live nodes+events (table = "SRC" or "DST"). Supports traversal status keys only.

func (*DB) GetStatsCountAtDepth

func (db *DB) GetStatsCountAtDepth(table string, depth int, key string) (int64, error)

GetStatsCountAtDepth returns the count for (depth, key) from live nodes+events. Supports traversal status keys only (e.g. traversal/pending, traversal/failed).

func (*DB) GetTraversalCountAtDepthFromLive added in v0.1.41

func (db *DB) GetTraversalCountAtDepthFromLive(table string, depth int, status string) (int64, error)

GetTraversalCountAtDepthFromLive returns the count of nodes at the given depth with the given traversal_status (event-derived).

func (*DB) GetTraversalStatusCountsFromEvents added in v0.1.41

func (db *DB) GetTraversalStatusCountsFromEvents(table string) (TraversalStatusCounts, error)

GetTraversalStatusCountsFromEvents returns counts of nodes by current traversal_status, derived from the status_events table (arg_max per id) joined to the node table. Use for verification instead of stats-table counters.

func (*DB) Path

func (db *DB) Path() string

Path returns the database file path (or ":memory:").

func (*DB) ResyncReviewStats added in v0.1.41

func (db *DB) ResyncReviewStats() error

ResyncReviewStats recomputes the canonical review stats from live nodes and latest status events, then writes the full snapshot to the universal stats table. Use for recovery when stats are missing or stale.

func (*DB) RunWrite added in v0.1.41

func (db *DB) RunWrite(ctx context.Context, fn func(s *WriteSession) error) error

RunWrite holds writeMu and runs fn with a WriteSession (single connection). Use Conn() for raw conn (e.g. appender) or WithTx for transactional writes. Caller must not retain the session or conn after fn returns.

func (*DB) SealLevel

func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error

SealLevel persists a sealed level from memory cache to the DB (bulk append + stats snapshot). copyP/copyS/copyF are used for SRC copy stats when >= 0. Payload is enqueued to the seal buffer, which writes and checkpoints asynchronously.

func (*DB) SealLevelDepth0

func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error

SealLevelDepth0 updates existing root row(s) at depth 0 and writes stats. Root rows are seeded up-front, so depth 0 uses update semantics instead of appender insert.

func (*DB) WaitUntilSealFlushedThrough

func (db *DB) WaitUntilSealFlushedThrough(depth int)

WaitUntilSealFlushedThrough blocks until the seal buffer has written at least the given depth (for backpressure: don't run more than one round ahead of flushed state).

type DstDescendantsReviewStats added in v0.1.41

type DstDescendantsReviewStats struct {
	Folders          int64
	Files            int64
	Excluded         int64
	SizeDst          int64
	TraversalPending int64
	TraversalFailed  int64
}

DstDescendantsReviewStats holds aggregate counts for DST nodes under a path (descendants only, not the node at the path). Used to apply review-stats deltas when deleting those nodes.

type FetchResult

type FetchResult struct {
	Key                string
	State              *NodeState
	DstParentServiceID string // DST parent's ServiceID from path_hash join (copy pull only)
}

FetchResult is one row from a keyset list (id + full state). DstParentServiceID is populated by ListNodesCopyKeyset when joining dst_nodes on parent path_hash.

func ListDstBatchWithSrcChildren

func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)

ListDstBatchWithSrcChildren returns the next batch of DST nodes at depth (keyset afterID, limit) and their SRC children (join by parent_path = d.path). Status from events.

func ListNodesByDepthKeyset

func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)

ListNodesByDepthKeyset returns nodes at the given depth, ordered by id, after afterID, limit rows. If statusFilter is non-empty, only rows with current traversal_status = statusFilter are returned (event-derived).

func ListNodesCopyKeyset

func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int, statusFilter string) ([]FetchResult, error)

ListNodesCopyKeyset returns src_nodes at depth with current copy_status = statusFilter (event-derived), ordered by id. Pass CopyStatusPending for copy phase, CopyStatusFailed for copy-retry.

type InsertOperation

type InsertOperation struct {
	QueueType string // "SRC" or "DST"
	Level     int    // depth
	Status    string // initial traversal_status
	State     *NodeState
}

InsertOperation represents a single node insert in a batch.

type LogBuffer

type LogBuffer struct {
	// contains filtered or unexported fields
}

LogBuffer buffers log entries and flushes them to the DB logs table. Flush triggers: row threshold (batchSize), time-based (interval). Backpressure: when buffer reaches 2*batchSize, block all writers, drain the entire buffer, then unblock; writers then push freely until 2*batchSize again.

func NewLogBuffer

func NewLogBuffer(db *DB, batchSize int, interval time.Duration) *LogBuffer

NewLogBuffer creates a log buffer that flushes to the main DB's logs table. Flush threshold is batchSize; backpressure threshold is 2*batchSize (block all writers and drain, then resume).

func (*LogBuffer) Add

func (lb *LogBuffer) Add(e LogEntry)

Add adds an entry to the buffer. When buffer reaches 2*batchSize we block all writers, drain the entire buffer, then resume.

func (*LogBuffer) Flush

func (lb *LogBuffer) Flush()

Flush writes one batch if the buffer has entries. Does not block writers.

func (*LogBuffer) Stop

func (lb *LogBuffer) Stop()

Stop stops the flush loop and drains the entire buffer. Does not close the DB.

type LogEntry

type LogEntry struct {
	ID        string
	Timestamp string
	Level     string
	Entity    string
	EntityID  string
	Message   string
	Queue     string
}

LogEntry is a single log line for persistence.

type MergedReviewRow added in v0.1.41

type MergedReviewRow struct {
	Path               string
	Name               string
	Depth              int
	Type               string
	SrcNodeID          string
	DstNodeID          string
	SrcTraversalStatus string
	DstTraversalStatus string
	CopyStatus         string
	Excluded           bool
	Size               int64
}

MergedReviewRow is one row from the merged review view (SRC+DST joined by path_hash, status from events).

func ListMergedReviewDiffs added in v0.1.41

func ListMergedReviewDiffs(d *DB, f ReviewFilter, orderBy string, limit, offset int) ([]MergedReviewRow, int, error)

ListMergedReviewDiffs returns merged review rows and total count matching the filter, ordered and paginated.

type MergedReviewStats added in v0.1.41

type MergedReviewStats struct {
	Total           int
	Folders         int
	Files           int
	MissingOnSource int
	MissingOnDest   int
	Excluded        int
	SizeSrc         int64 // sum of file sizes on SRC (unique by path)
	SizeDst         int64 // sum of file sizes on DST (unique by path)
}

MergedReviewStats holds aggregate counts over merged review rows (one row per path; same filter semantics as list/count).

func GetMergedReviewStats added in v0.1.41

func GetMergedReviewStats(d *DB, f ReviewFilter) (MergedReviewStats, error)

GetMergedReviewStats returns aggregate counts for rows matching the filter (single query with FILTER). Counts are unique by path (merged view = one row per path).

type NodeDeletion

type NodeDeletion struct {
	Table  string
	NodeID string
}

NodeDeletion represents a node delete (retry DST cleanup).

type NodeMeta

type NodeMeta struct {
	ID              string
	Depth           int
	Type            string
	TraversalStatus string
	CopyStatus      string
}

NodeMeta is a subset of NodeState for batch lookups.

type NodeState

type NodeState struct {
	ID              string // Internal ULID-like id (deterministic from queueType, nodeType, path)
	ServiceID       string // FS id
	ParentID        string // Parent's internal id
	ParentServiceID string
	Path            string // Join key with other table
	ParentPath      string // Join key for children
	Name            string // Display name (for task/UI)
	Type            string // "folder" or "file"
	Size            int64
	MTime           string
	Depth           int
	TraversalStatus string // pending, successful, failed, not_on_src (dst)
	CopyStatus      string // pending, in_progress, successful, failed (src)
	Excluded        bool
	Errors          string // JSON placeholder for log refs
	Status          string // Alias for TraversalStatus (used by queue taskToNodeState)
	SrcID           string // Optional: corresponding SRC node id (join is by path; used during seeding for DST root)
}

NodeState is the in-memory representation of a row in src_nodes or dst_nodes. Path and parent_path are the join keys between SRC and DST.

func GetChildrenByParentID

func GetChildrenByParentID(d *DB, table, parentID string, limit int) ([]*NodeState, error)

GetChildrenByParentID returns up to limit children with the given parent_id. Status from latest events.

func GetChildrenByParentPath

func GetChildrenByParentPath(d *DB, table, parentPath string, limit int) ([]*NodeState, error)

GetChildrenByParentPath returns up to limit children with the given parent_path. Status from latest events.

func GetNodeByID

func GetNodeByID(d *DB, table, id string) (*NodeState, error)

GetNodeByID returns the node by id from the given table. Status is derived from latest status event.

func GetNodeByPath

func GetNodeByPath(d *DB, table, path string) (*NodeState, error)

GetNodeByPath returns the node by path from the given table. Status is derived from latest status event.

func GetRootNode

func GetRootNode(d *DB, table string) (id string, state *NodeState, ok bool)

GetRootNode returns the root node (path = '/') from the given table.

func QueryNodesForReview added in v0.1.41

func QueryNodesForReview(d *DB, table string, depth *int, status string, excluded *bool, pathLike string, orderByPath bool, limit, offset int) ([]NodeState, error)

QueryNodesForReview returns nodes from the given table (SRC or DST) with optional depth, status, excluded, pathLike filters. Status from events. Used by review API.

type Options

type Options struct {
	Path       string             // Path to DuckDB file (e.g. "migration.duckdb")
	SealBuffer *SealBufferOptions // Optional overrides for seal buffer; nil uses defaults. Seal buffer is always created.
}

Options configures DB open behavior.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns default options.

type ReviewFilter added in v0.1.41

type ReviewFilter struct {
	ParentPath  string
	Query       string
	QueryField  string
	Status      string
	FoldersOnly bool
	ExcludeRoot bool

	StatusSearchType string
	TraversalStatus  string
	CopyStatus       string

	TypeFilter string

	DepthOperator string
	DepthValue    *int
	SizeOperator  string
	SizeValue     *int64
}

ReviewFilter narrows merged review rows for listing, search, and counts. ParentPath: if non-empty, only direct children of this path (uses parent_path_hash). Query + QueryField: substring match; QueryField "" or unknown = path OR name; "path" = path only; "name" = name only. Status (legacy): if non-empty and structured status fields are not used, match any of src_traversal, dst_traversal, or copy_status. StatusSearchType + TraversalStatus + CopyStatus: phase-aware filters ("traversal", "copy", "both"). TypeFilter: "folder" or "file" (case-insensitive); also FoldersOnly implies folder. ExcludeRoot: if true, exclude path = '/' from results (for global search).

type ReviewStatsDelta added in v0.1.41

type ReviewStatsDelta struct {
	Key   string
	Delta int64
}

ReviewStatsDelta is one (key, delta) for the universal stats table.

type ReviewStatsSnapshot added in v0.1.41

type ReviewStatsSnapshot struct {
	TraversalPending      int64
	TraversalPendingRetry int64
	TraversalSuccessful   int64
	TraversalFailed       int64
	CopyPending           int64
	CopySuccessful        int64
	CopyFailed            int64
	Excluded              int64
	Folders               int64
	Files                 int64
	SizeSrc               int64
	SizeDst               int64
}

ReviewStatsSnapshot is the canonical persisted review stats in the universal stats table.

type SealBuffer

type SealBuffer struct {
	// contains filtered or unexported fields
}

SealBuffer buffers seal jobs and discovery jobs (nodes + events only), flushes them to the DB asynchronously. Flush triggers: interval timer, row count threshold, and Stop/ForceFlush. When a phase is active (StartPhase called), Flush uses persistent appenders and one tx per flush. Discovery jobs use Pending == discoveryJobStatsSentinel so stats are not written.

func NewSealBuffer

func NewSealBuffer(db *DB, opts SealBufferOptions) *SealBuffer

NewSealBuffer creates a seal buffer and starts its flush loop.

func (*SealBuffer) Add

func (sb *SealBuffer) Add(table string, depth int, nodes []*NodeState, pending, successful, failed, completed, copyP, copyS, copyF int64) error

Add enqueues a seal job. Status events are derived from nodes (one event per node with current traversal/copy status). Returns the error from Flush so callers can avoid removing from cache until data is confirmed in the buffer.

func (*SealBuffer) AddDiscoveryNodes added in v0.1.41

func (sb *SealBuffer) AddDiscoveryNodes(ops []InsertOperation)

AddDiscoveryNodes enqueues discovered nodes (and their initial status events) for async flush. No stats written (discovery job). Call from traversal completion; flush is async until Flush (e.g. before round advance).

func (*SealBuffer) AddDiscoveryStatusEvent added in v0.1.41

func (sb *SealBuffer) AddDiscoveryStatusEvent(table string, e StatusEvent, fromRetry bool)

AddDiscoveryStatusEvent enqueues one status event (e.g. completed/failed) for async flush. No stats written. fromRetry should be true when the completion is from retry mode so we subtract from PendingRetry when the path zeros.

func (*SealBuffer) AddFailedSubtreePath added in v0.1.41

func (sb *SealBuffer) AddFailedSubtreePath(parentPath string)

AddFailedSubtreePath enqueues an SRC folder path for subtree failure propagation. All pending descendants of this path will be marked as copy_status='failed' at the next flush (row threshold, flush ticker, or explicit FlushAppenderBuffer e.g. before round advance). Batched with other seal work; do not flush synchronously here.

func (*SealBuffer) AddTaskError added in v0.1.41

func (sb *SealBuffer) AddTaskError(rec TaskErrorRecord)

AddTaskError enqueues one task error for async flush via the seal buffer.

func (*SealBuffer) Flush

func (sb *SealBuffer) Flush() error

Flush drains queued jobs and task errors and writes them to the DB. When a phase is active, uses persistent appenders and one tx per flush (append + stats). Otherwise uses legacy per-flush appenders. On write failure, jobs and task errors are re-queued so waiters in WaitUntilFlushedThrough do not block forever.

func (*SealBuffer) LastFlushedDepth

func (sb *SealBuffer) LastFlushedDepth() int

LastFlushedDepth returns the maximum depth that has been written to the DB by a completed Flush. -1 until first flush.

func (*SealBuffer) StartPhase added in v0.1.41

func (sb *SealBuffer) StartPhase(conn *sql.Conn) error

StartPhase starts a phase (traversal or copy): conn is held for the phase; 4 appenders are created and reused until StopPhase. Call from DB.BeginTraversalPhase / BeginCopyPhase. Must not be called when a phase is already active.

func (*SealBuffer) Stop

func (sb *SealBuffer) Stop()

Stop stops the flush loop and flushes any remaining jobs.

func (*SealBuffer) StopPhase added in v0.1.41

func (sb *SealBuffer) StopPhase() error

StopPhase flushes any remaining jobs (one tx), closes appenders and conn, and clears phase. Call from DB.EndTraversalPhase / EndCopyPhase.

func (*SealBuffer) WaitUntilFlushedThrough

func (sb *SealBuffer) WaitUntilFlushedThrough(depth int)

WaitUntilFlushedThrough blocks until at least the given depth has been written by a completed Flush.

type SealBufferOptions

type SealBufferOptions struct {
	FlushInterval time.Duration
	RowThreshold  int
	HardCap       int
	FlushTimeout  time.Duration // max time for a single flush operation (default 60s)
}

SealBufferOptions configures the seal buffer. Zero value uses defaults.

type SealJob

type SealJob struct {
	Table      string
	Depth      int
	Nodes      []*NodeState
	Events     []StatusEvent
	Pending    int64
	Successful int64
	Failed     int64
	Completed  int64
	CopyP      int64
	CopyS      int64
	CopyF      int64
	FromRetry  bool // event-only: this completion is from retry mode; subtract from PendingRetry when path zeros
}

SealJob is one sealed level's payload: table, depth, node metadata, status events, and stats.

type StatsDelta added in v0.1.41

type StatsDelta struct {
	Table string
	Depth int
	Key   string
	Delta int64
}

StatsDelta is one (table, depth, key) delta for batch application.

type StatsRow

type StatsRow struct {
	Depth int
	Key   string
	Count int64
}

StatsRow is one row (depth, key, count) for breakdown by level. Key is e.g. traversal/pending.

type StatusEvent added in v0.1.41

type StatusEvent struct {
	ID              string
	TraversalStatus string // nullable in DB
	CopyStatus      string // src only; empty for dst
	EventTime       int64
	Depth           int
	// PrevTraversalStatus and PrevCopyStatus carry the status that was current before this event.
	// Set at enqueue time (task already has the loaded state); used by the seal buffer to compute
	// per-depth level-stat deltas without re-querying the events table.
	PrevTraversalStatus string
	PrevCopyStatus      string
}

StatusEvent is one append-only row for src_status_events or dst_status_events.

type StatusUpdateOperation

type StatusUpdateOperation struct {
	QueueType string
	Level     int
	OldStatus string
	NewStatus string
	NodeID    string
}

StatusUpdateOperation represents a traversal status transition (e.g. pending → successful).

type SubtreeStats

type SubtreeStats struct {
	TotalNodes   int
	TotalFolders int
	TotalFiles   int
	MaxDepth     int
}

SubtreeStats holds aggregate counts for a subtree (path = rootPath OR path LIKE rootPath || '/%').

func CountSubtree

func CountSubtree(d *DB, table, rootPath string) (SubtreeStats, error)

CountSubtree returns aggregate counts for the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'). Single SQL query, no DFS.

type TaskErrorRecord added in v0.1.41

type TaskErrorRecord struct {
	QueueType string
	Phase     string
	NodeID    string
	Message   string
	Attempts  int
	Path      string
}

TaskErrorRecord is one buffered row for task_errors (queue_type, phase, node_id, message, attempts, path).

type TraversalStatusCounts added in v0.1.41

type TraversalStatusCounts struct {
	Pending    int64
	Successful int64
	Failed     int64
	NotOnSrc   int64 // DST only; 0 for SRC
	Excluded   int64 // excluded or exclusion_inherited
}

TraversalStatusCounts holds traversal status counts derived from the status_events table (one row per node, latest event).

type WriteOperation

type WriteOperation interface {
	// contains filtered or unexported methods
}

WriteOperation is an operation that can be buffered and flushed via the writer.

type WriteSession added in v0.1.41

type WriteSession struct {
	// contains filtered or unexported fields
}

WriteSession is the handle passed to RunWrite. Caller must not retain conn after the callback returns.

func (*WriteSession) Conn added in v0.1.41

func (s *WriteSession) Conn() *sql.Conn

Conn returns the single DB connection for raw use (e.g. DuckDB appender). Valid only during the RunWrite callback.

func (*WriteSession) WithTx added in v0.1.41

func (s *WriteSession) WithTx(fn func(w *Writer) error) error

WithTx runs fn inside a transaction on this session's connection.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is the write handle for DuckDB. Used inside RunWrite via WriteSession.WithTx.

func (*Writer) AppenderInsert

func (w *Writer) AppenderInsert(table string, nodes []*NodeState) error

AppenderInsert inserts node metadata into src_nodes or dst_nodes (batch INSERT). No status columns.

func (*Writer) ApplyReviewStatsDeltas added in v0.1.41

func (w *Writer) ApplyReviewStatsDeltas(deltas []ReviewStatsDelta) error

ApplyReviewStatsDeltas applies deltas to the universal stats table (count += delta per key). Used for incremental review stats updates.

func (*Writer) ApplyStatsDeltas added in v0.1.41

func (w *Writer) ApplyStatsDeltas(deltas []StatsDelta) error

ApplyStatsDeltas applies multiple stats deltas in one transaction. Used by seal buffer after discovery/copy flushes.

func (*Writer) BatchInsertDstStatusEvents added in v0.1.41

func (w *Writer) BatchInsertDstStatusEvents(events []StatusEvent) error

BatchInsertDstStatusEvents inserts status events into dst_status_events inside the current transaction. Used by seal flush so events are atomic with nodes/stats.

func (*Writer) BatchInsertSrcStatusEvents added in v0.1.41

func (w *Writer) BatchInsertSrcStatusEvents(events []StatusEvent) error

BatchInsertSrcStatusEvents inserts status events into src_status_events inside the current transaction. Used by seal flush so events are atomic with nodes/stats.

func (*Writer) CountCopyStatusBucketsSubtreeNotExcluded added in v0.1.41

func (w *Writer) CountCopyStatusBucketsSubtreeNotExcluded(rootPath string) (CopyStatusBucketsSubtreeNotExcluded, error)

CountCopyStatusBucketsSubtreeNotExcluded counts SRC nodes under rootPath whose latest copy_status is not excluded, by bucket. Matches Writer.SetNodeExcluded universal deltas (in_progress and skipped have no separate review-table copy bucket). Call inside a transaction.

func (*Writer) CountDstDescendantsReviewStats added in v0.1.41

func (w *Writer) CountDstDescendantsReviewStats(rootPath string) (DstDescendantsReviewStats, error)

CountDstDescendantsReviewStats returns aggregate counts for DST nodes that are strict descendants of rootPath (path LIKE rootPath||'/%'; for rootPath "/" uses path != '/' AND path LIKE '/%'). Call inside a transaction.

func (*Writer) CountDstNodesUnderPath added in v0.1.41

func (w *Writer) CountDstNodesUnderPath(parentPath string) (int64, error)

CountDstNodesUnderPath returns the number of DST nodes whose parent_path equals parentPath (direct children only). Call inside a transaction.

func (*Writer) CountDstNodesUnderPathWithTraversalStatus added in v0.1.41

func (w *Writer) CountDstNodesUnderPathWithTraversalStatus(parentPath, status string) (int64, error)

CountDstNodesUnderPathWithTraversalStatus returns the number of DST nodes under parentPath (parent_path = parentPath) whose current traversal_status equals status. Call inside a transaction.

func (*Writer) CountExcludedInSubtree added in v0.1.41

func (w *Writer) CountExcludedInSubtree(table, rootPath string) (excluded, notExcluded int64, err error)

CountExcludedInSubtree returns (excluded, notExcluded) counts for nodes in the SRC subtree. Excluded = copy_status in (excluded_explicit, excluded_inherited). Call inside a transaction.

func (*Writer) CountPendingTraversalAtPath added in v0.1.41

func (w *Writer) CountPendingTraversalAtPath(path string) (int, error)

CountPendingTraversalAtPath returns how many nodes (SRC + DST) at the given path have current traversal_status = 'pending'. Call inside the same transaction after status events have been written so the count reflects the new state.

func (*Writer) DeleteDescendantsUnderPath added in v0.1.41

func (w *Writer) DeleteDescendantsUnderPath(table, rootPath string) error

DeleteDescendantsUnderPath deletes only nodes that are strict descendants of rootPath (path LIKE rootPath||'/%'; for rootPath "/" deletes path != '/' AND path LIKE '/%'). The node at rootPath itself is not deleted. For DST, also deletes matching rows from dst_status_events. Recomputes stats for affected depths. Table is "SRC" or "DST".

func (*Writer) DeleteNode

func (w *Writer) DeleteNode(table, nodeID string) error

DeleteNode deletes the node from the given table (for retry DST cleanup).

func (*Writer) DeleteSubtree

func (w *Writer) DeleteSubtree(table, rootPath string) error

DeleteSubtree deletes all nodes in the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'), then recomputes stats for each affected depth. Table is "SRC" or "DST".

func (*Writer) InsertDstChildrenTraversalStatusEvents added in v0.1.41

func (w *Writer) InsertDstChildrenTraversalStatusEvents(parentPath, status string) error

InsertDstChildrenTraversalStatusEvents appends one traversal_status event for each DST node whose parent_path equals parentPath. Used when marking/unmarking SRC node for retry (DST-only children get pending or not_on_src). Call inside RunWrite.

func (*Writer) InsertExclusionEventsForSubtree added in v0.1.41

func (w *Writer) InsertExclusionEventsForSubtree(table, rootPath string) error

InsertExclusionEventsForSubtree appends one status event per node in the SRC subtree: copy_status = excluded_explicit (root) or excluded_inherited (descendants); traversal_status preserved. Call inside RunWrite.

func (*Writer) InsertLog

func (w *Writer) InsertLog(id string, level, message, component, entity, entityID, queue string) error

InsertLog inserts a row into logs. id must be unique (e.g. from GenerateLogID).

func (*Writer) InsertStatusEvent added in v0.1.41

func (w *Writer) InsertStatusEvent(table string, e *StatusEvent) error

InsertStatusEvent appends one row to src_status_events or dst_status_events. Table is "SRC" or "DST".

func (*Writer) InsertUnexcludeEventsForSubtree added in v0.1.41

func (w *Writer) InsertUnexcludeEventsForSubtree(table, rootPath string) error

InsertUnexcludeEventsForSubtree appends one status event per node in the SRC subtree: copy_status = 'pending', traversal_status preserved. Call inside RunWrite.

func (*Writer) NodePath added in v0.1.41

func (w *Writer) NodePath(nodeID string) (path string, tbl string, err error)

NodePath returns the path and table ("SRC" or "DST") for a node by ID, or error if not found.

func (*Writer) NodeTraversalStatus added in v0.1.41

func (w *Writer) NodeTraversalStatus(table, nodeID string) (string, error)

NodeTraversalStatus returns the current traversal_status for the node from the events table. Call inside a transaction.

func (*Writer) PropagateSubtreeFailure added in v0.1.41

func (w *Writer) PropagateSubtreeFailure(parentPath string) (int64, error)

PropagateSubtreeFailure inserts copy_status='failed' events for all SRC descendants of parentPath whose current copy_status is 'pending'. Returns the number of affected nodes. Call inside a transaction.

func (*Writer) RecomputeStatsForDepth

func (w *Writer) RecomputeStatsForDepth(table string, depth int) error

RecomputeStatsForDepth deletes stats for the given table and depth, then writes fresh counts from nodes joined with current status (event-derived). For bulk operations only (DeleteSubtree, exclusion propagation, diffs/search). Single-node updates use UpdateStatsCountByDelta instead.

func (*Writer) RecordTaskError

func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error

RecordTaskError inserts a row into task_errors.

func (*Writer) SealDepth0

func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error

SealDepth0 writes the depth-0 stats snapshot and emits status events for each depth-0 node so the events table reflects current state (e.g. root marked successful after round 0 completes).

func (*Writer) SetNodeCopyStatus

func (w *Writer) SetNodeCopyStatus(table, nodeID, status string) error

SetNodeCopyStatus emits a copy_status event and applies stat deltas for SRC. Single-node path: no full recompute.

func (*Writer) SetNodeExcluded

func (w *Writer) SetNodeExcluded(table, nodeID string, excluded bool) error

SetNodeExcluded emits a copy_status-only event for SRC: excluding sets copy_status to excluded_explicit (traversal_status unchanged); unexcluding sets copy_status to pending. Applies copy-status stat deltas only.

func (*Writer) SetNodeTraversalStatus

func (w *Writer) SetNodeTraversalStatus(table, nodeID, status string) error

SetNodeTraversalStatus emits a traversal_status event and applies stat deltas (decrement old, increment new). Single-node path: no full recompute.

func (*Writer) SetStatsCountForDepth

func (w *Writer) SetStatsCountForDepth(table string, depth int, key string, count int64) error

SetStatsCountForDepth is a no-op; per-depth stats are no longer persisted (callers use live nodes+events).

func (*Writer) UpdateStatsCountByDelta added in v0.1.41

func (w *Writer) UpdateStatsCountByDelta(table string, depth int, key string, delta int64) error

UpdateStatsCountByDelta is a no-op; per-depth stats are no longer persisted.

func (*Writer) UpsertNodes added in v0.1.41

func (w *Writer) UpsertNodes(table string, nodes []*NodeState) error

UpsertNodes inserts node metadata into src_nodes or dst_nodes. On conflict (id) does nothing so duplicate inserts (e.g. retry re-discovery before DST children are deleted) are safe.

func (*Writer) UpsertStatsCounts added in v0.1.41

func (w *Writer) UpsertStatsCounts(table string, tuples []struct {
	depth int
	key   string
	count int64
}) error

func (*Writer) WriteLevelStatsSnapshot

func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error

WriteLevelStatsSnapshot is a no-op; per-depth stats are no longer persisted (callers use live nodes+events).

func (*Writer) WriteQueueStats

func (w *Writer) WriteQueueStats(queueKey, metricsJSON string) error

WriteQueueStats upserts queue metrics JSON into queue_stats.

func (*Writer) WriteReviewStatsSnapshot added in v0.1.41

func (w *Writer) WriteReviewStatsSnapshot(s ReviewStatsSnapshot) error

WriteReviewStatsSnapshot writes the full canonical review stats snapshot to the universal stats table (replaces counts for review keys).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL