Documentation
¶
Index ¶
- Constants
- func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)
- func BatchGetDstIDsFromSrcIDs(d *DB, srcIDs []string) (map[string]string, error)
- func BatchGetNodeMeta(d *DB, table string, ids []string) (map[string]NodeMeta, error)
- func BatchGetNodesByID(d *DB, table string, ids []string) (map[string]*NodeState, error)
- func BatchInsertNodes(d *DB, ops []InsertOperation) error
- func CopyStatusForDisplay(status string) string
- func CountExcluded(d *DB, table string) (int, error)
- func CountExcludedInSubtree(d *DB, table, rootPath string) (int, error)
- func CountMergedReviewRows(d *DB, f ReviewFilter) (int, error)
- func CountNodes(d *DB, table string) (int, error)
- func DeterministicNodeID(queueType, nodeType, path string) string
- func DropNodeTableIndexes(db *DB, table string) error
- func DropStatusEventTableIndexes(db *DB, table string) error
- func DstStatusEventAppendRowArgs(e *StatusEvent) []any
- func EnsureNodeTableIndexes(db *DB, table string) error
- func EnsureStatusEventTableIndexes(db *DB, table string) error
- func GenerateLogID() string
- func GetAllLevels(d *DB, table string) ([]int, error)
- func GetChildrenIDsByParentID(d *DB, table, parentID string, limit int) ([]string, error)
- func GetDstIDFromSrcID(d *DB, srcParentID string) (string, error)
- func GetDstIDToSrcPath(d *DB, dstIDs []string) (map[string]string, error)
- func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)
- func GetTotalFileSizes(d *DB) (srcTotal, dstTotal int64, err error)
- func InsertRootNode(d *DB, table string, state *NodeState) error
- func MergedReviewQueryBase() string
- func NodeStateAppendRowArgs(n *NodeState) []any
- func NormalizeRootRelativePath(path string) string
- func NormalizeSubtreeRootPathForPropagation(path string) string
- func PathHash(path string) string
- func SrcStatusEventAppendRowArgs(e *StatusEvent) []any
- func StatsKeyCopyStatus(status string) string
- func StatsKeyTraversalStatus(status string) string
- type BatchInsertOperation
- type CopyStatusBucketsSubtreeNotExcluded
- type CopyStatusCounts
- type DB
- func (db *DB) AddNodeDeletion(table, nodeID string) error
- func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error
- func (db *DB) AddSealNodes(table string, depth int, nodes []*NodeState) error
- func (db *DB) AppendDiscoveredNodes(ops []InsertOperation)
- func (db *DB) AppendFailedSubtree(parentPath string)
- func (db *DB) AppendStatusEvent(table string, e StatusEvent, fromRetry bool)
- func (db *DB) AppendTaskError(queueType, phase, nodeID, message string, attempts int, path string)
- func (db *DB) BeginCopyPhase(ctx context.Context) error
- func (db *DB) BeginTraversalPhase(ctx context.Context) error
- func (db *DB) Checkpoint() error
- func (db *DB) Close() error
- func (db *DB) EndCopyPhase() error
- func (db *DB) EndTraversalPhase() error
- func (db *DB) FlushAppenderBuffer() error
- func (db *DB) FlushSealBuffer() error
- func (db *DB) GetAllQueueStats() (map[string][]byte, error)
- func (db *DB) GetCopyCountAtDepth(depth int, nodeType string, copyStatus string, breakAtFirst bool) (int64, error)
- func (db *DB) GetCopyStatusCountsFromEvents() (CopyStatusCounts, error)
- func (db *DB) GetDB() (*sql.DB, error)
- func (db *DB) GetDBForPulls(queueType string) (*sql.DB, error)
- func (db *DB) GetMaxDepth(table string) (int, error)
- func (db *DB) GetPathReviewStatsFromDB() (ReviewStatsSnapshot, error)
- func (db *DB) GetPendingTraversalCountAtDepthFromLive(table string, depth int) (int64, error)
- func (db *DB) GetQueueStats(queueKey string) ([]byte, error)
- func (db *DB) GetReviewStatsSnapshot() (ReviewStatsSnapshot, error)
- func (db *DB) GetStatsBreakdown(table string) ([]StatsRow, error)
- func (db *DB) GetStatsCount(table, key string) (int64, error)
- func (db *DB) GetStatsCountAtDepth(table string, depth int, key string) (int64, error)
- func (db *DB) GetTraversalCountAtDepthFromLive(table string, depth int, status string) (int64, error)
- func (db *DB) GetTraversalStatusCountsFromEvents(table string) (TraversalStatusCounts, error)
- func (db *DB) Path() string
- func (db *DB) ResyncReviewStats() error
- func (db *DB) RunWrite(ctx context.Context, fn func(s *WriteSession) error) error
- func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, ...) error
- func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, ...) error
- func (db *DB) WaitUntilSealFlushedThrough(depth int)
- type DstDescendantsReviewStats
- type FetchResult
- func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)
- func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)
- func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int, statusFilter string) ([]FetchResult, error)
- type InsertOperation
- type LogBuffer
- type LogEntry
- type MergedReviewRow
- type MergedReviewStats
- type NodeDeletion
- type NodeMeta
- type NodeState
- func GetChildrenByParentID(d *DB, table, parentID string, limit int) ([]*NodeState, error)
- func GetChildrenByParentPath(d *DB, table, parentPath string, limit int) ([]*NodeState, error)
- func GetNodeByID(d *DB, table, id string) (*NodeState, error)
- func GetNodeByPath(d *DB, table, path string) (*NodeState, error)
- func GetRootNode(d *DB, table string) (id string, state *NodeState, ok bool)
- func QueryNodesForReview(d *DB, table string, depth *int, status string, excluded *bool, ...) ([]NodeState, error)
- type Options
- type ReviewFilter
- type ReviewStatsDelta
- type ReviewStatsSnapshot
- type SealBuffer
- func (sb *SealBuffer) Add(table string, depth int, nodes []*NodeState, ...) error
- func (sb *SealBuffer) AddDiscoveryNodes(ops []InsertOperation)
- func (sb *SealBuffer) AddDiscoveryStatusEvent(table string, e StatusEvent, fromRetry bool)
- func (sb *SealBuffer) AddFailedSubtreePath(parentPath string)
- func (sb *SealBuffer) AddTaskError(rec TaskErrorRecord)
- func (sb *SealBuffer) Flush() error
- func (sb *SealBuffer) LastFlushedDepth() int
- func (sb *SealBuffer) StartPhase(conn *sql.Conn) error
- func (sb *SealBuffer) Stop()
- func (sb *SealBuffer) StopPhase() error
- func (sb *SealBuffer) WaitUntilFlushedThrough(depth int)
- type SealBufferOptions
- type SealJob
- type StatsDelta
- type StatsRow
- type StatusEvent
- type StatusUpdateOperation
- type SubtreeStats
- type TaskErrorRecord
- type TraversalStatusCounts
- type WriteOperation
- type WriteSession
- type Writer
- func (w *Writer) AppenderInsert(table string, nodes []*NodeState) error
- func (w *Writer) ApplyReviewStatsDeltas(deltas []ReviewStatsDelta) error
- func (w *Writer) ApplyStatsDeltas(deltas []StatsDelta) error
- func (w *Writer) BatchInsertDstStatusEvents(events []StatusEvent) error
- func (w *Writer) BatchInsertSrcStatusEvents(events []StatusEvent) error
- func (w *Writer) CountCopyStatusBucketsSubtreeNotExcluded(rootPath string) (CopyStatusBucketsSubtreeNotExcluded, error)
- func (w *Writer) CountDstDescendantsReviewStats(rootPath string) (DstDescendantsReviewStats, error)
- func (w *Writer) CountDstNodesUnderPath(parentPath string) (int64, error)
- func (w *Writer) CountDstNodesUnderPathWithTraversalStatus(parentPath, status string) (int64, error)
- func (w *Writer) CountExcludedInSubtree(table, rootPath string) (excluded, notExcluded int64, err error)
- func (w *Writer) CountPendingTraversalAtPath(path string) (int, error)
- func (w *Writer) DeleteDescendantsUnderPath(table, rootPath string) error
- func (w *Writer) DeleteNode(table, nodeID string) error
- func (w *Writer) DeleteSubtree(table, rootPath string) error
- func (w *Writer) InsertDstChildrenTraversalStatusEvents(parentPath, status string) error
- func (w *Writer) InsertExclusionEventsForSubtree(table, rootPath string) error
- func (w *Writer) InsertLog(id string, level, message, component, entity, entityID, queue string) error
- func (w *Writer) InsertStatusEvent(table string, e *StatusEvent) error
- func (w *Writer) InsertUnexcludeEventsForSubtree(table, rootPath string) error
- func (w *Writer) NodePath(nodeID string) (path string, tbl string, err error)
- func (w *Writer) NodeTraversalStatus(table, nodeID string) (string, error)
- func (w *Writer) PropagateSubtreeFailure(parentPath string) (int64, error)
- func (w *Writer) RecomputeStatsForDepth(table string, depth int) error
- func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error
- func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, ...) error
- func (w *Writer) SetNodeCopyStatus(table, nodeID, status string) error
- func (w *Writer) SetNodeExcluded(table, nodeID string, excluded bool) error
- func (w *Writer) SetNodeTraversalStatus(table, nodeID, status string) error
- func (w *Writer) SetStatsCountForDepth(table string, depth int, key string, count int64) error
- func (w *Writer) UpdateStatsCountByDelta(table string, depth int, key string, delta int64) error
- func (w *Writer) UpsertNodes(table string, nodes []*NodeState) error
- func (w *Writer) UpsertStatsCounts(table string, tuples []struct{ ... }) error
- func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, ...) error
- func (w *Writer) WriteQueueStats(queueKey, metricsJSON string) error
- func (w *Writer) WriteReviewStatsSnapshot(s ReviewStatsSnapshot) error
Constants ¶
const ( StatusPending = "pending" StatusSuccessful = "successful" StatusFailed = "failed" StatusNotOnSrc = "not_on_src" // DST only StatusExcluded = "excluded" StatusExclusionInherited = "exclusion_inherited" // bulk subtree exclusion )
Traversal status values (event-derived). Stats keys use: pending, successful, failed, not_on_src (DST only).
const ( CopyStatusPending = "pending" CopyStatusInProgress = "in_progress" // transient; not stored in stats CopyStatusSuccessful = "successful" CopyStatusFailed = "failed" CopyStatusSkipped = "skipped" CopyStatusExcludedExplicit = "excluded_explicit" // user excluded this node from copy CopyStatusExcludedInherited = "excluded_inherited" // excluded by parent folder CopyStatusExcluded = "excluded" // display value for UI (both explicit and inherited) )
Copy status values (src_nodes only). Stats keys use: pending, successful, failed (no in_progress in stats).
const ( NodeTypeFolder = "folder" NodeTypeFile = "file" )
Node type values.
const ( // TableMigrations is the migrations lifecycle table name (used by schema and migration store). TableMigrations = "migrations" // TableMigrationEnvelope holds one row per migration DB: 32-byte envelope master key for Sylos-FS credentials. TableMigrationEnvelope = "migration_envelope" // TableFSCredentialBinding holds per-side connection id, optional creds file path, and serialized root folder for API rehydration. TableFSCredentialBinding = "fs_credential_binding" )
const ( ReviewKeyTraversalPending = "traversal/pending" ReviewKeyTraversalPendingRetry = "traversal/pending_retry" ReviewKeyTraversalSuccessful = "traversal/successful" ReviewKeyTraversalFailed = "traversal/failed" ReviewKeyCopyPending = "copy/pending" ReviewKeyCopySuccessful = "copy/successful" ReviewKeyCopyFailed = "copy/failed" ReviewKeyExcluded = "excluded" ReviewKeyFolders = "folders" ReviewKeyFiles = "files" ReviewKeySizeSrc = "size_src" ReviewKeySizeDst = "size_dst" )
Universal stats table keys for canonical review stats (tableStats). Namespaced as traversal/*, copy/*, and flat aggregates.
const StatsKeyCompleted = "completed"
StatsKeyCompleted is the stats key for completed count at a depth (written at seal).
const StatsKeyExpected = "expected"
StatsKeyExpected is the stats key for expected count at a depth (set at round start).
Variables ¶
This section is empty.
Functions ¶
func BatchGetChildrenIDsByParentIDs ¶
func BatchGetChildrenIDsByParentIDs(d *DB, table string, parentIDs []string) (map[string][]string, error)
BatchGetChildrenIDsByParentIDs returns map[parentID][]childID for the given table and parent ids.
func BatchGetDstIDsFromSrcIDs ¶
BatchGetDstIDsFromSrcIDs returns map[srcID]dstID by resolving SRC id->path then DST path->id in two queries (no per-item reads).
func BatchGetNodeMeta ¶
BatchGetNodeMeta returns meta (id, depth, type, traversal_status, copy_status) for the given ids. Status from latest events.
func BatchGetNodesByID ¶
BatchGetNodesByID returns nodes by id for the given table in one query. Status from latest events.
func BatchInsertNodes ¶
func BatchInsertNodes(d *DB, ops []InsertOperation) error
BatchInsertNodes inserts nodes via the writer.
func CopyStatusForDisplay ¶ added in v0.1.41
CopyStatusForDisplay returns the copy_status to show in the UI. Internal excluded_explicit and excluded_inherited both become "excluded".
func CountExcluded ¶
CountExcluded returns the number of SRC nodes with copy_status in (excluded_explicit, excluded_inherited). Exclusion is SRC-only; returns 0 for DST.
func CountExcludedInSubtree ¶
CountExcludedInSubtree returns the number of SRC nodes in the subtree with copy_status in (excluded_explicit, excluded_inherited). Exclusion is SRC-only; returns 0 for DST.
func CountMergedReviewRows ¶ added in v0.1.41
func CountMergedReviewRows(d *DB, f ReviewFilter) (int, error)
CountMergedReviewRows returns the number of merged rows matching the filter.
func CountNodes ¶
CountNodes returns the total number of nodes in the given table (src_nodes or dst_nodes). Live table count, not from stats. Uses pull conn so we see appender-written data.
func DeterministicNodeID ¶
DeterministicNodeID returns a stable id from (queueType, nodeType, path) for race-safe deduplication.
func DropNodeTableIndexes ¶ added in v0.1.41
DropNodeTableIndexes drops the non-primary indexes on the given node table (e.g. "src_nodes", "dst_nodes"). Call before a bulk phase to avoid index maintenance cost during inserts.
func DropStatusEventTableIndexes ¶ added in v0.1.41
DropStatusEventTableIndexes drops indexes on the given status event table. Call before a bulk phase.
func DstStatusEventAppendRowArgs ¶ added in v0.1.41
func DstStatusEventAppendRowArgs(e *StatusEvent) []any
DstStatusEventAppendRowArgs returns column values for one row in dst_status_events (id, traversal_status, event_time, depth) for appender.
func EnsureNodeTableIndexes ¶
EnsureNodeTableIndexes creates stable lookup indexes on the given node table (e.g. "src_nodes", "dst_nodes") for path_hash, parent_path_hash, and depth.
Idempotent for create/drop operations.
func EnsureStatusEventTableIndexes ¶ added in v0.1.41
EnsureStatusEventTableIndexes creates indexes on the given status event table for id and (id, event_time) to support "latest event per node" queries. Idempotent.
func GetAllLevels ¶
GetAllLevels returns distinct depth values for the table, sorted.
func GetChildrenIDsByParentID ¶
GetChildrenIDsByParentID returns child ids for the given parent_id (up to limit).
func GetDstIDFromSrcID ¶
GetDstIDFromSrcID returns the DST node id that has the same path as the given SRC node id (join by path).
func GetDstIDToSrcPath ¶
GetDstIDToSrcPath returns for each DST id the path (which is the join key = "src path").
func GetSrcChildrenGroupedByParentPath ¶
func GetSrcChildrenGroupedByParentPath(d *DB, parentPaths []string) (map[string][]*NodeState, error)
GetSrcChildrenGroupedByParentPath returns SRC nodes grouped by parent_path. Status from latest events.
func GetTotalFileSizes ¶ added in v0.1.41
GetTotalFileSizes returns the sum of size across src_nodes and dst_nodes (for API totalFileSize).
func InsertRootNode ¶
InsertRootNode inserts the root node (path "/", depth 0) as metadata only, then emits initial status event(s).
func MergedReviewQueryBase ¶ added in v0.1.41
func MergedReviewQueryBase() string
MergedReviewQueryBase returns the WITH clause for the merged src+dst review view (status from events). Use with " SELECT ... FROM merged" + where. merged includes parent_path_hash for indexed child lookups.
func NodeStateAppendRowArgs ¶
NodeStateAppendRowArgs returns the column values for one NodeState (metadata only) in table order for use with duckdb.Appender.AppendRow.
func NormalizeRootRelativePath ¶ added in v0.1.41
NormalizeRootRelativePath returns a root-relative path with no "//" so SRC/DST path_hash joins match. Root is "/"; children are "/name", "/name/child". Collapses any "//" to "/".
func NormalizeSubtreeRootPathForPropagation ¶ added in v0.1.41
NormalizeSubtreeRootPathForPropagation normalizes a failed folder path so subtree updates match src_nodes.path (root-relative slashes, trim trailing slash except "/").
func PathHash ¶ added in v0.1.41
PathHash returns a deterministic 32-char hex hash of path for use as an index key.
func SrcStatusEventAppendRowArgs ¶ added in v0.1.41
func SrcStatusEventAppendRowArgs(e *StatusEvent) []any
SrcStatusEventAppendRowArgs returns column values for one row in src_status_events (id, traversal_status, copy_status, event_time, depth) for appender.
func StatsKeyCopyStatus ¶
StatsKeyCopyStatus returns the src_stats key for copy status (src_nodes only). Valid statuses: pending, successful, failed.
func StatsKeyTraversalStatus ¶
StatsKeyTraversalStatus returns the src_stats/dst_stats key for traversal status. Valid statuses: pending, successful, failed, not_on_src (DST only).
Types ¶
type BatchInsertOperation ¶
type BatchInsertOperation struct {
Operations []InsertOperation
}
BatchInsertOperation is a batch of node inserts.
type CopyStatusBucketsSubtreeNotExcluded ¶ added in v0.1.41
type CopyStatusBucketsSubtreeNotExcluded struct {
Pending int64
Failed int64
Successful int64
Skipped int64
InProgress int64
}
CopyStatusBucketsSubtreeNotExcluded holds counts of current SRC copy_status among nodes in the subtree that are not yet excluded (exclude propagation runs on this set).
type CopyStatusCounts ¶ added in v0.1.41
type CopyStatusCounts struct {
Pending int64
Successful int64
Failed int64
Skipped int64
Excluded int64 // excluded_explicit + excluded_inherited
}
CopyStatusCounts holds copy status counts for SRC (from src_status_events).
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is the DuckDB-backed database handle. Single physical connection for all DB operations (schema, bulk append at seal, pulls, checkpoint).
func (*DB) AddNodeDeletion ¶
AddNodeDeletion deletes a node immediately (retry DST cleanup).
func (*DB) AddNodeDeletions ¶
func (db *DB) AddNodeDeletions(deletions []NodeDeletion) error
AddNodeDeletions deletes multiple nodes.
func (*DB) AddSealNodes ¶ added in v0.1.41
AddSealNodes writes a subset of nodes and their status events to the seal buffer (no level stats). Use when removing nodes from cache after DST task completion or SRC early completion; only remove from cache if this returns nil.
func (*DB) AppendDiscoveredNodes ¶ added in v0.1.41
func (db *DB) AppendDiscoveredNodes(ops []InsertOperation)
AppendDiscoveredNodes adds discovered nodes (and their initial status events) to the seal buffer discovery queue. Call from traversal completion; flush is async until FlushAppenderBuffer.
func (*DB) AppendFailedSubtree ¶ added in v0.1.41
AppendFailedSubtree enqueues an SRC folder path for subtree failure propagation. At the next flush, all pending descendants will be marked as copy_status='failed'.
func (*DB) AppendStatusEvent ¶ added in v0.1.41
func (db *DB) AppendStatusEvent(table string, e StatusEvent, fromRetry bool)
AppendStatusEvent adds a status event (e.g. completed/failed) to the seal buffer discovery queue. Call from CompleteTraversalTask / FailTraversalTask. fromRetry should be true when the completion is from retry mode so we decrement PendingRetry (not Pending) when the path zeros.
func (*DB) AppendTaskError ¶ added in v0.1.41
AppendTaskError adds a task error to the seal buffer for async flush. Call from workers on failure (replaces immediate RecordTaskError).
func (*DB) BeginCopyPhase ¶ added in v0.1.41
BeginCopyPhase starts the copy phase (same as traversal: drop indexes, persistent appenders).
func (*DB) BeginTraversalPhase ¶ added in v0.1.41
BeginTraversalPhase starts the traversal phase: drops secondary indexes, acquires a persistent connection and creates 4 appenders for the seal buffer. Flush will use one tx per flush until EndTraversalPhase.
func (*DB) Checkpoint ¶
Checkpoint runs CHECKPOINT on the main conn, guarded by checkpointMu. Call at root seeding and round advancement only. Uses a single connection; running CHECKPOINT on multiple connections causes "Could not remove file X.wal: No such file or directory".
func (*DB) Close ¶
Close closes the database connection. Stops the seal buffer first (flushing any pending seal and discovery jobs).
func (*DB) EndCopyPhase ¶ added in v0.1.41
EndCopyPhase ends the copy phase (flush, close appenders, rebuild indexes, checkpoint).
func (*DB) EndTraversalPhase ¶ added in v0.1.41
EndTraversalPhase flushes remaining seal jobs, closes phase appenders, checkpoints to flush WAL and free memory, then rebuilds indexes.
func (*DB) FlushAppenderBuffer ¶ added in v0.1.41
FlushAppenderBuffer flushes the seal buffer (including discovery queue) and returns when all pending nodes and status events are persisted. Call before round advance.
func (*DB) FlushSealBuffer ¶
FlushSealBuffer drains pending seal jobs to the DB. Call before backpressure wait so we don't block on the flush interval.
func (*DB) GetAllQueueStats ¶
GetAllQueueStats returns all queue stats from queue_stats table.
func (*DB) GetCopyCountAtDepth ¶
func (db *DB) GetCopyCountAtDepth(depth int, nodeType string, copyStatus string, breakAtFirst bool) (int64, error)
GetCopyCountAtDepth returns the count of nodes in src_nodes at the given depth with current copy_status (event-derived). Optional nodeType filter. If breakAtFirst is true, returns 1 if any matching node exists or 0 otherwise.
func (*DB) GetCopyStatusCountsFromEvents ¶ added in v0.1.41
func (db *DB) GetCopyStatusCountsFromEvents() (CopyStatusCounts, error)
GetCopyStatusCountsFromEvents returns counts of SRC nodes by current copy_status from src_status_events (arg_max per id). Use for review/API counts.
func (*DB) GetDBForPulls ¶
GetDBForPulls returns the main connection for pull queries. queueType is "SRC" or "DST" (both use same conn).
func (*DB) GetMaxDepth ¶
GetMaxDepth returns the maximum depth present in the nodes table for the given table ("SRC" or "DST"). Used as stop condition for retry sweep.
func (*DB) GetPathReviewStatsFromDB ¶ added in v0.1.41
func (db *DB) GetPathReviewStatsFromDB() (ReviewStatsSnapshot, error)
GetPathReviewStatsFromDB computes path review stats from nodes and status events only (no stats table). Use for GetPathReviewStats() and verification.
func (*DB) GetPendingTraversalCountAtDepthFromLive ¶
GetPendingTraversalCountAtDepthFromLive returns the count of nodes at the given depth with current traversal_status = 'pending' (event-derived).
func (*DB) GetQueueStats ¶
GetQueueStats returns the metrics JSON for the queue key from queue_stats table.
func (*DB) GetReviewStatsSnapshot ¶ added in v0.1.41
func (db *DB) GetReviewStatsSnapshot() (ReviewStatsSnapshot, error)
GetReviewStatsSnapshot reads the full canonical review stats from the universal stats table.
func (*DB) GetStatsBreakdown ¶
GetStatsBreakdown returns (depth, key, count) from live nodes+events grouped by depth and traversal_status. Order: depth, key.
func (*DB) GetStatsCount ¶
GetStatsCount returns the total count for the given key from live nodes+events (table = "SRC" or "DST"). Supports traversal status keys only.
func (*DB) GetStatsCountAtDepth ¶
GetStatsCountAtDepth returns the count for (depth, key) from live nodes+events. Supports traversal status keys only (e.g. traversal/pending, traversal/failed).
func (*DB) GetTraversalCountAtDepthFromLive ¶ added in v0.1.41
func (db *DB) GetTraversalCountAtDepthFromLive(table string, depth int, status string) (int64, error)
GetTraversalCountAtDepthFromLive returns the count of nodes at the given depth with the given traversal_status (event-derived).
func (*DB) GetTraversalStatusCountsFromEvents ¶ added in v0.1.41
func (db *DB) GetTraversalStatusCountsFromEvents(table string) (TraversalStatusCounts, error)
GetTraversalStatusCountsFromEvents returns counts of nodes by current traversal_status, derived from the status_events table (arg_max per id) joined to the node table. Use for verification instead of stats-table counters.
func (*DB) ResyncReviewStats ¶ added in v0.1.41
ResyncReviewStats recomputes the canonical review stats from live nodes and latest status events, then writes the full snapshot to the universal stats table. Use for recovery when stats are missing or stale.
func (*DB) RunWrite ¶ added in v0.1.41
RunWrite holds writeMu and runs fn with a WriteSession (single connection). Use Conn() for raw conn (e.g. appender) or WithTx for transactional writes. Caller must not retain the session or conn after fn returns.
func (*DB) SealLevel ¶
func (db *DB) SealLevel(table string, depth int, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error
SealLevel persists a sealed level from memory cache to the DB (bulk append + stats snapshot). copyP/copyS/copyF are used for SRC copy stats when >= 0. Payload is enqueued to the seal buffer, which writes and checkpoints asynchronously.
func (*DB) SealLevelDepth0 ¶
func (db *DB) SealLevelDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyP, copyS, copyF int64) error
SealLevelDepth0 updates existing root row(s) at depth 0 and writes stats. Root rows are seeded up-front, so depth 0 uses update semantics instead of appender insert.
func (*DB) WaitUntilSealFlushedThrough ¶
WaitUntilSealFlushedThrough blocks until the seal buffer has written at least the given depth (for backpressure: don't run more than one round ahead of flushed state).
type DstDescendantsReviewStats ¶ added in v0.1.41
type DstDescendantsReviewStats struct {
Folders int64
Files int64
Excluded int64
SizeDst int64
TraversalPending int64
TraversalFailed int64
}
DstDescendantsReviewStats holds aggregate counts for DST nodes under a path (descendants only, not the node at the path). Used to apply review-stats deltas when deleting those nodes.
type FetchResult ¶
type FetchResult struct {
Key string
State *NodeState
DstParentServiceID string // DST parent's ServiceID from path_hash join (copy pull only)
}
FetchResult is one row from a keyset list (id + full state). DstParentServiceID is populated by ListNodesCopyKeyset when joining dst_nodes on parent path_hash.
func ListDstBatchWithSrcChildren ¶
func ListDstBatchWithSrcChildren(d *DB, depth int, afterID string, limit int, traversalStatus string) ([]FetchResult, map[string][]*NodeState, error)
ListDstBatchWithSrcChildren returns the next batch of DST nodes at depth (keyset afterID, limit) and their SRC children (join by parent_path = d.path). Status from events.
func ListNodesByDepthKeyset ¶
func ListNodesByDepthKeyset(d *DB, table string, depth int, afterID, statusFilter string, limit int) ([]FetchResult, error)
ListNodesByDepthKeyset returns nodes at the given depth, ordered by id, after afterID, limit rows. If statusFilter is non-empty, only rows with current traversal_status = statusFilter are returned (event-derived).
func ListNodesCopyKeyset ¶
func ListNodesCopyKeyset(d *DB, depth int, nodeType, afterID string, limit int, statusFilter string) ([]FetchResult, error)
ListNodesCopyKeyset returns src_nodes at depth with current copy_status = statusFilter (event-derived), ordered by id. Pass CopyStatusPending for copy phase, CopyStatusFailed for copy-retry.
type InsertOperation ¶
type InsertOperation struct {
QueueType string // "SRC" or "DST"
Level int // depth
Status string // initial traversal_status
State *NodeState
}
InsertOperation represents a single node insert in a batch.
type LogBuffer ¶
type LogBuffer struct {
// contains filtered or unexported fields
}
LogBuffer buffers log entries and flushes them to the DB logs table. Flush triggers: row threshold (batchSize), time-based (interval). Backpressure: when buffer reaches 2*batchSize, block all writers, drain the entire buffer, then unblock; writers then push freely until 2*batchSize again.
func NewLogBuffer ¶
NewLogBuffer creates a log buffer that flushes to the main DB's logs table. Flush threshold is batchSize; backpressure threshold is 2*batchSize (block all writers and drain, then resume).
func (*LogBuffer) Add ¶
Add adds an entry to the buffer. When buffer reaches 2*batchSize we block all writers, drain the entire buffer, then resume.
type LogEntry ¶
type LogEntry struct {
ID string
Timestamp string
Level string
Entity string
EntityID string
Message string
Queue string
}
LogEntry is a single log line for persistence.
type MergedReviewRow ¶ added in v0.1.41
type MergedReviewRow struct {
Path string
Name string
Depth int
Type string
SrcNodeID string
DstNodeID string
SrcTraversalStatus string
DstTraversalStatus string
CopyStatus string
Excluded bool
Size int64
}
MergedReviewRow is one row from the merged review view (SRC+DST joined by path_hash, status from events).
func ListMergedReviewDiffs ¶ added in v0.1.41
func ListMergedReviewDiffs(d *DB, f ReviewFilter, orderBy string, limit, offset int) ([]MergedReviewRow, int, error)
ListMergedReviewDiffs returns merged review rows and total count matching the filter, ordered and paginated.
type MergedReviewStats ¶ added in v0.1.41
type MergedReviewStats struct {
Total int
Folders int
Files int
MissingOnSource int
MissingOnDest int
Excluded int
SizeSrc int64 // sum of file sizes on SRC (unique by path)
SizeDst int64 // sum of file sizes on DST (unique by path)
}
MergedReviewStats holds aggregate counts over merged review rows (one row per path; same filter semantics as list/count).
func GetMergedReviewStats ¶ added in v0.1.41
func GetMergedReviewStats(d *DB, f ReviewFilter) (MergedReviewStats, error)
GetMergedReviewStats returns aggregate counts for rows matching the filter (single query with FILTER). Counts are unique by path (merged view = one row per path).
type NodeDeletion ¶
NodeDeletion represents a node delete (retry DST cleanup).
type NodeState ¶
type NodeState struct {
ID string // Internal ULID-like id (deterministic from queueType, nodeType, path)
ServiceID string // FS id
ParentID string // Parent's internal id
ParentServiceID string
Path string // Join key with other table
ParentPath string // Join key for children
Name string // Display name (for task/UI)
Type string // "folder" or "file"
Size int64
MTime string
Depth int
TraversalStatus string // pending, successful, failed, not_on_src (dst)
CopyStatus string // pending, in_progress, successful, failed (src)
Excluded bool
Errors string // JSON placeholder for log refs
Status string // Alias for TraversalStatus (used by queue taskToNodeState)
SrcID string // Optional: corresponding SRC node id (join is by path; used during seeding for DST root)
}
NodeState is the in-memory representation of a row in src_nodes or dst_nodes. Path and parent_path are the join keys between SRC and DST.
func GetChildrenByParentID ¶
GetChildrenByParentID returns up to limit children with the given parent_id. Status from latest events.
func GetChildrenByParentPath ¶
GetChildrenByParentPath returns up to limit children with the given parent_path. Status from latest events.
func GetNodeByID ¶
GetNodeByID returns the node by id from the given table. Status is derived from latest status event.
func GetNodeByPath ¶
GetNodeByPath returns the node by path from the given table. Status is derived from latest status event.
func GetRootNode ¶
GetRootNode returns the root node (path = '/') from the given table.
func QueryNodesForReview ¶ added in v0.1.41
func QueryNodesForReview(d *DB, table string, depth *int, status string, excluded *bool, pathLike string, orderByPath bool, limit, offset int) ([]NodeState, error)
QueryNodesForReview returns nodes from the given table (SRC or DST) with optional depth, status, excluded, pathLike filters. Status from events. Used by review API.
type Options ¶
type Options struct {
Path string // Path to DuckDB file (e.g. "migration.duckdb")
SealBuffer *SealBufferOptions // Optional overrides for seal buffer; nil uses defaults. Seal buffer is always created.
}
Options configures DB open behavior.
type ReviewFilter ¶ added in v0.1.41
type ReviewFilter struct {
ParentPath string
Query string
QueryField string
Status string
FoldersOnly bool
ExcludeRoot bool
StatusSearchType string
TraversalStatus string
CopyStatus string
TypeFilter string
DepthOperator string
DepthValue *int
SizeOperator string
SizeValue *int64
}
ReviewFilter narrows merged review rows for listing, search, and counts. ParentPath: if non-empty, only direct children of this path (uses parent_path_hash). Query + QueryField: substring match; QueryField "" or unknown = path OR name; "path" = path only; "name" = name only. Status (legacy): if non-empty and structured status fields are not used, match any of src_traversal, dst_traversal, or copy_status. StatusSearchType + TraversalStatus + CopyStatus: phase-aware filters ("traversal", "copy", "both"). TypeFilter: "folder" or "file" (case-insensitive); also FoldersOnly implies folder. ExcludeRoot: if true, exclude path = '/' from results (for global search).
type ReviewStatsDelta ¶ added in v0.1.41
ReviewStatsDelta is one (key, delta) for the universal stats table.
type ReviewStatsSnapshot ¶ added in v0.1.41
type ReviewStatsSnapshot struct {
TraversalPending int64
TraversalPendingRetry int64
TraversalSuccessful int64
TraversalFailed int64
CopyPending int64
CopySuccessful int64
CopyFailed int64
Excluded int64
Folders int64
Files int64
SizeSrc int64
SizeDst int64
}
ReviewStatsSnapshot is the canonical persisted review stats in the universal stats table.
type SealBuffer ¶
type SealBuffer struct {
// contains filtered or unexported fields
}
SealBuffer buffers seal jobs and discovery jobs (nodes + events only), flushes them to the DB asynchronously. Flush triggers: interval timer, row count threshold, and Stop/ForceFlush. When a phase is active (StartPhase called), Flush uses persistent appenders and one tx per flush. Discovery jobs use Pending == discoveryJobStatsSentinel so stats are not written.
func NewSealBuffer ¶
func NewSealBuffer(db *DB, opts SealBufferOptions) *SealBuffer
NewSealBuffer creates a seal buffer and starts its flush loop.
func (*SealBuffer) Add ¶
func (sb *SealBuffer) Add(table string, depth int, nodes []*NodeState, pending, successful, failed, completed, copyP, copyS, copyF int64) error
Add enqueues a seal job. Status events are derived from nodes (one event per node with current traversal/copy status). Returns the error from Flush so callers can avoid removing from cache until data is confirmed in the buffer.
func (*SealBuffer) AddDiscoveryNodes ¶ added in v0.1.41
func (sb *SealBuffer) AddDiscoveryNodes(ops []InsertOperation)
AddDiscoveryNodes enqueues discovered nodes (and their initial status events) for async flush. No stats written (discovery job). Call from traversal completion; flush is async until Flush (e.g. before round advance).
func (*SealBuffer) AddDiscoveryStatusEvent ¶ added in v0.1.41
func (sb *SealBuffer) AddDiscoveryStatusEvent(table string, e StatusEvent, fromRetry bool)
AddDiscoveryStatusEvent enqueues one status event (e.g. completed/failed) for async flush. No stats written. fromRetry should be true when the completion is from retry mode so we subtract from PendingRetry when the path zeros.
func (*SealBuffer) AddFailedSubtreePath ¶ added in v0.1.41
func (sb *SealBuffer) AddFailedSubtreePath(parentPath string)
AddFailedSubtreePath enqueues an SRC folder path for subtree failure propagation. All pending descendants of this path will be marked as copy_status='failed' at the next flush (row threshold, flush ticker, or explicit FlushAppenderBuffer e.g. before round advance). Batched with other seal work; do not flush synchronously here.
func (*SealBuffer) AddTaskError ¶ added in v0.1.41
func (sb *SealBuffer) AddTaskError(rec TaskErrorRecord)
AddTaskError enqueues one task error for async flush via the seal buffer.
func (*SealBuffer) Flush ¶
func (sb *SealBuffer) Flush() error
Flush drains queued jobs and task errors and writes them to the DB. When a phase is active, uses persistent appenders and one tx per flush (append + stats). Otherwise uses legacy per-flush appenders. On write failure, jobs and task errors are re-queued so waiters in WaitUntilFlushedThrough do not block forever.
func (*SealBuffer) LastFlushedDepth ¶
func (sb *SealBuffer) LastFlushedDepth() int
LastFlushedDepth returns the maximum depth that has been written to the DB by a completed Flush. -1 until first flush.
func (*SealBuffer) StartPhase ¶ added in v0.1.41
func (sb *SealBuffer) StartPhase(conn *sql.Conn) error
StartPhase starts a phase (traversal or copy): conn is held for the phase; 4 appenders are created and reused until StopPhase. Call from DB.BeginTraversalPhase / BeginCopyPhase. Must not be called when a phase is already active.
func (*SealBuffer) Stop ¶
func (sb *SealBuffer) Stop()
Stop stops the flush loop and flushes any remaining jobs.
func (*SealBuffer) StopPhase ¶ added in v0.1.41
func (sb *SealBuffer) StopPhase() error
StopPhase flushes any remaining jobs (one tx), closes appenders and conn, and clears phase. Call from DB.EndTraversalPhase / EndCopyPhase.
func (*SealBuffer) WaitUntilFlushedThrough ¶
func (sb *SealBuffer) WaitUntilFlushedThrough(depth int)
WaitUntilFlushedThrough blocks until at least the given depth has been written by a completed Flush.
type SealBufferOptions ¶
type SealBufferOptions struct {
FlushInterval time.Duration
RowThreshold int
HardCap int
FlushTimeout time.Duration // max time for a single flush operation (default 60s)
}
SealBufferOptions configures the seal buffer. Zero value uses defaults.
type SealJob ¶
type SealJob struct {
Table string
Depth int
Nodes []*NodeState
Events []StatusEvent
Pending int64
Successful int64
Failed int64
Completed int64
CopyP int64
CopyS int64
CopyF int64
FromRetry bool // event-only: this completion is from retry mode; subtract from PendingRetry when path zeros
}
SealJob is one sealed level's payload: table, depth, node metadata, status events, and stats.
type StatsDelta ¶ added in v0.1.41
StatsDelta is one (table, depth, key) delta for batch application.
type StatsRow ¶
StatsRow is one row (depth, key, count) for breakdown by level. Key is e.g. traversal/pending.
type StatusEvent ¶ added in v0.1.41
type StatusEvent struct {
ID string
TraversalStatus string // nullable in DB
CopyStatus string // src only; empty for dst
EventTime int64
Depth int
// PrevTraversalStatus and PrevCopyStatus carry the status that was current before this event.
// Set at enqueue time (task already has the loaded state); used by the seal buffer to compute
// per-depth level-stat deltas without re-querying the events table.
PrevTraversalStatus string
PrevCopyStatus string
}
StatusEvent is one append-only row for src_status_events or dst_status_events.
type StatusUpdateOperation ¶
type StatusUpdateOperation struct {
QueueType string
Level int
OldStatus string
NewStatus string
NodeID string
}
StatusUpdateOperation represents a traversal status transition (e.g. pending → successful).
type SubtreeStats ¶
SubtreeStats holds aggregate counts for a subtree (path = rootPath OR path LIKE rootPath || '/%').
func CountSubtree ¶
func CountSubtree(d *DB, table, rootPath string) (SubtreeStats, error)
CountSubtree returns aggregate counts for the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'). Single SQL query, no DFS.
type TaskErrorRecord ¶ added in v0.1.41
type TaskErrorRecord struct {
QueueType string
Phase string
NodeID string
Message string
Attempts int
Path string
}
TaskErrorRecord is one buffered row for task_errors (queue_type, phase, node_id, message, attempts, path).
type TraversalStatusCounts ¶ added in v0.1.41
type TraversalStatusCounts struct {
Pending int64
Successful int64
Failed int64
NotOnSrc int64 // DST only; 0 for SRC
Excluded int64 // excluded or exclusion_inherited
}
TraversalStatusCounts holds traversal status counts derived from the status_events table (one row per node, latest event).
type WriteOperation ¶
type WriteOperation interface {
// contains filtered or unexported methods
}
WriteOperation is an operation that can be buffered and flushed via the writer.
type WriteSession ¶ added in v0.1.41
type WriteSession struct {
// contains filtered or unexported fields
}
WriteSession is the handle passed to RunWrite. Caller must not retain conn after the callback returns.
func (*WriteSession) Conn ¶ added in v0.1.41
func (s *WriteSession) Conn() *sql.Conn
Conn returns the single DB connection for raw use (e.g. DuckDB appender). Valid only during the RunWrite callback.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is the write handle for DuckDB. Used inside RunWrite via WriteSession.WithTx.
func (*Writer) AppenderInsert ¶
AppenderInsert inserts node metadata into src_nodes or dst_nodes (batch INSERT). No status columns.
func (*Writer) ApplyReviewStatsDeltas ¶ added in v0.1.41
func (w *Writer) ApplyReviewStatsDeltas(deltas []ReviewStatsDelta) error
ApplyReviewStatsDeltas applies deltas to the universal stats table (count += delta per key). Used for incremental review stats updates.
func (*Writer) ApplyStatsDeltas ¶ added in v0.1.41
func (w *Writer) ApplyStatsDeltas(deltas []StatsDelta) error
ApplyStatsDeltas applies multiple stats deltas in one transaction. Used by seal buffer after discovery/copy flushes.
func (*Writer) BatchInsertDstStatusEvents ¶ added in v0.1.41
func (w *Writer) BatchInsertDstStatusEvents(events []StatusEvent) error
BatchInsertDstStatusEvents inserts status events into dst_status_events inside the current transaction. Used by seal flush so events are atomic with nodes/stats.
func (*Writer) BatchInsertSrcStatusEvents ¶ added in v0.1.41
func (w *Writer) BatchInsertSrcStatusEvents(events []StatusEvent) error
BatchInsertSrcStatusEvents inserts status events into src_status_events inside the current transaction. Used by seal flush so events are atomic with nodes/stats.
func (*Writer) CountCopyStatusBucketsSubtreeNotExcluded ¶ added in v0.1.41
func (w *Writer) CountCopyStatusBucketsSubtreeNotExcluded(rootPath string) (CopyStatusBucketsSubtreeNotExcluded, error)
CountCopyStatusBucketsSubtreeNotExcluded counts SRC nodes under rootPath whose latest copy_status is not excluded, by bucket. Matches Writer.SetNodeExcluded universal deltas (in_progress and skipped have no separate review-table copy bucket). Call inside a transaction.
func (*Writer) CountDstDescendantsReviewStats ¶ added in v0.1.41
func (w *Writer) CountDstDescendantsReviewStats(rootPath string) (DstDescendantsReviewStats, error)
CountDstDescendantsReviewStats returns aggregate counts for DST nodes that are strict descendants of rootPath (path LIKE rootPath||'/%'; for rootPath "/" uses path != '/' AND path LIKE '/%'). Call inside a transaction.
func (*Writer) CountDstNodesUnderPath ¶ added in v0.1.41
CountDstNodesUnderPath returns the number of DST nodes whose parent_path equals parentPath (direct children only). Call inside a transaction.
func (*Writer) CountDstNodesUnderPathWithTraversalStatus ¶ added in v0.1.41
func (w *Writer) CountDstNodesUnderPathWithTraversalStatus(parentPath, status string) (int64, error)
CountDstNodesUnderPathWithTraversalStatus returns the number of DST nodes under parentPath (parent_path = parentPath) whose current traversal_status equals status. Call inside a transaction.
func (*Writer) CountExcludedInSubtree ¶ added in v0.1.41
func (w *Writer) CountExcludedInSubtree(table, rootPath string) (excluded, notExcluded int64, err error)
CountExcludedInSubtree returns (excluded, notExcluded) counts for nodes in the SRC subtree. Excluded = copy_status in (excluded_explicit, excluded_inherited). Call inside a transaction.
func (*Writer) CountPendingTraversalAtPath ¶ added in v0.1.41
CountPendingTraversalAtPath returns how many nodes (SRC + DST) at the given path have current traversal_status = 'pending'. Call inside the same transaction after status events have been written so the count reflects the new state.
func (*Writer) DeleteDescendantsUnderPath ¶ added in v0.1.41
DeleteDescendantsUnderPath deletes only nodes that are strict descendants of rootPath (path LIKE rootPath||'/%'; for rootPath "/" deletes path != '/' AND path LIKE '/%'). The node at rootPath itself is not deleted. For DST, also deletes matching rows from dst_status_events. Recomputes stats for affected depths. Table is "SRC" or "DST".
func (*Writer) DeleteNode ¶
DeleteNode deletes the node from the given table (for retry DST cleanup).
func (*Writer) DeleteSubtree ¶
DeleteSubtree deletes all nodes in the subtree at rootPath (path = rootPath OR path LIKE rootPath || '/%'; for rootPath "/" uses path LIKE '/%'), then recomputes stats for each affected depth. Table is "SRC" or "DST".
func (*Writer) InsertDstChildrenTraversalStatusEvents ¶ added in v0.1.41
InsertDstChildrenTraversalStatusEvents appends one traversal_status event for each DST node whose parent_path equals parentPath. Used when marking/unmarking SRC node for retry (DST-only children get pending or not_on_src). Call inside RunWrite.
func (*Writer) InsertExclusionEventsForSubtree ¶ added in v0.1.41
InsertExclusionEventsForSubtree appends one status event per node in the SRC subtree: copy_status = excluded_explicit (root) or excluded_inherited (descendants); traversal_status preserved. Call inside RunWrite.
func (*Writer) InsertLog ¶
func (w *Writer) InsertLog(id string, level, message, component, entity, entityID, queue string) error
InsertLog inserts a row into logs. id must be unique (e.g. from GenerateLogID).
func (*Writer) InsertStatusEvent ¶ added in v0.1.41
func (w *Writer) InsertStatusEvent(table string, e *StatusEvent) error
InsertStatusEvent appends one row to src_status_events or dst_status_events. Table is "SRC" or "DST".
func (*Writer) InsertUnexcludeEventsForSubtree ¶ added in v0.1.41
InsertUnexcludeEventsForSubtree appends one status event per node in the SRC subtree: copy_status = 'pending', traversal_status preserved. Call inside RunWrite.
func (*Writer) NodePath ¶ added in v0.1.41
NodePath returns the path and table ("SRC" or "DST") for a node by ID, or error if not found.
func (*Writer) NodeTraversalStatus ¶ added in v0.1.41
NodeTraversalStatus returns the current traversal_status for the node from the events table. Call inside a transaction.
func (*Writer) PropagateSubtreeFailure ¶ added in v0.1.41
PropagateSubtreeFailure inserts copy_status='failed' events for all SRC descendants of parentPath whose current copy_status is 'pending'. Returns the number of affected nodes. Call inside a transaction.
func (*Writer) RecomputeStatsForDepth ¶
RecomputeStatsForDepth deletes stats for the given table and depth, then writes fresh counts from nodes joined with current status (event-derived). For bulk operations only (DeleteSubtree, exclusion propagation, diffs/search). Single-node updates use UpdateStatsCountByDelta instead.
func (*Writer) RecordTaskError ¶
func (w *Writer) RecordTaskError(queueType, phase, nodeID, message string, attempts int, path string) error
RecordTaskError inserts a row into task_errors.
func (*Writer) SealDepth0 ¶
func (w *Writer) SealDepth0(table string, nodes []*NodeState, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error
SealDepth0 writes the depth-0 stats snapshot and emits status events for each depth-0 node so the events table reflects current state (e.g. root marked successful after round 0 completes).
func (*Writer) SetNodeCopyStatus ¶
SetNodeCopyStatus emits a copy_status event and applies stat deltas for SRC. Single-node path: no full recompute.
func (*Writer) SetNodeExcluded ¶
SetNodeExcluded emits a copy_status-only event for SRC: excluding sets copy_status to excluded_explicit (traversal_status unchanged); unexcluding sets copy_status to pending. Applies copy-status stat deltas only.
func (*Writer) SetNodeTraversalStatus ¶
SetNodeTraversalStatus emits a traversal_status event and applies stat deltas (decrement old, increment new). Single-node path: no full recompute.
func (*Writer) SetStatsCountForDepth ¶
SetStatsCountForDepth is a no-op; per-depth stats are no longer persisted (callers use live nodes+events).
func (*Writer) UpdateStatsCountByDelta ¶ added in v0.1.41
UpdateStatsCountByDelta is a no-op; per-depth stats are no longer persisted.
func (*Writer) UpsertNodes ¶ added in v0.1.41
UpsertNodes inserts node metadata into src_nodes or dst_nodes. On conflict (id) does nothing so duplicate inserts (e.g. retry re-discovery before DST children are deleted) are safe.
func (*Writer) UpsertStatsCounts ¶ added in v0.1.41
func (*Writer) WriteLevelStatsSnapshot ¶
func (w *Writer) WriteLevelStatsSnapshot(table string, depth int, pending, successful, failed, completed int64, copyPending, copySuccessful, copyFailed int64) error
WriteLevelStatsSnapshot is a no-op; per-depth stats are no longer persisted (callers use live nodes+events).
func (*Writer) WriteQueueStats ¶
WriteQueueStats upserts queue metrics JSON into queue_stats.
func (*Writer) WriteReviewStatsSnapshot ¶ added in v0.1.41
func (w *Writer) WriteReviewStatsSnapshot(s ReviewStatsSnapshot) error
WriteReviewStatsSnapshot writes the full canonical review stats snapshot to the universal stats table (replaces counts for review keys).