client

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2025 License: MIT Imports: 17 Imported by: 0

README

client package

The client package provides the necessary tools for Go applications to communicate with a RaftLock cluster. It offers a robust and configurable client for managing distributed locks, with features like automatic leader discovery, request retries, and metrics collection.

Core Features

  • Multiple Client Interfaces The package offers three distinct client interfaces to cater to different use cases:
    • RaftLockClient: A high-level client for standard lock operations like acquiring, releasing, and renewing locks.
    • AdminClient: A specialized client for administrative and monitoring tasks, such as checking cluster status and health.
    • AdvancedClient: A low-level client for more granular control over lock queuing and other advanced features.
  • Fluent Client Configuration A RaftLockClientBuilder provides a fluent API for constructing and configuring clients, allowing for easy setup of endpoints, timeouts, and other parameters.
  • Automatic Leader Redirection The client can automatically detect and redirect requests to the current Raft leader, simplifying client-side logic.
  • Configurable Retry Policies The client supports configurable retry policies with exponential backoff and jitter for handling transient errors and network issues. You can customize the number of retries, backoff durations, and which errors are considered retryable.
  • Stateful Lock Handling The LockHandle interface offers a stateful wrapper for managing the lifecycle of a single lock, simplifying acquire, renew, and release operations.
  • Automated Lock Renewal The AutoRenewer automatically renews a held lock in the background, ensuring it doesn't expire while in use.
  • Comprehensive Metrics The client includes a Metrics interface for collecting detailed operational metrics, which can be integrated with monitoring systems. A no-op implementation is available if metrics are not needed.

Client Types

The package exposes three client interfaces to meet different needs.

RaftLockClient

The RaftLockClient is the primary client for application use, providing core distributed locking functionality:

  • Acquire: Attempts to acquire a lock, with an option to wait in a queue if the lock is held.
  • Release: Releases a previously acquired lock.
  • Renew: Extends the TTL of a held lock.
  • GetLockInfo: Retrieves metadata and state for a specific lock.
  • GetLocks: Returns a paginated list of locks matching a filter.
AdminClient

The AdminClient is designed for administrative and monitoring purposes:

  • GetStatus: Retrieves the current status of a RaftLock cluster node.
  • Health: Checks the health of the RaftLock service.
  • GetBackoffAdvice: Provides adaptive backoff parameters to guide client retry behavior.
AdvancedClient

The AdvancedClient offers low-level control for specialized use cases:

  • EnqueueWaiter: Explicitly adds the client to a lock's wait queue.
  • CancelWait: Removes the client from a lock's wait queue.
  • GetLeaderAddress: Returns the address of the current Raft leader.
  • IsConnected: Reports whether the client has an active connection.
  • SetRetryPolicy: Sets the client's retry behavior.

Getting Started

To get started with the RaftLock client, you first need to construct a client instance using the RaftLockClientBuilder.

Building a Client

The RaftLockClientBuilder provides a fluent API for creating and configuring clients. You can set server endpoints, timeouts, keep-alive settings, and more.

package main

import (
 "context"
 "fmt"
 "log"
 "time"

 "github.com/jathurchan/raftlock/client"
)

func main() {
 // A slice of server addresses for the client to connect to.
 endpoints := []string{"localhost:8080", "localhost:8081"}

 // Use the builder to construct a new RaftLockClient.
 raftClient, err := client.NewRaftLockClientBuilder(endpoints).
  WithTimeouts(5*time.Second, 10*time.Second). // Set dial and request timeouts.
  WithMetrics(true).                           // Enable metrics collection.
  Build()
 if err != nil {
  log.Fatalf("Failed to build RaftLock client: %v", err)
 }
 defer raftClient.Close()

 fmt.Println("RaftLock client created successfully!")
}
Acquiring and Releasing a Lock

You can use a LockHandle for a more convenient way to manage a lock's lifecycle.

package main

import (
 "context"
 "fmt"
 "log"
 "time"

 "github.com/jathurchan/raftlock/client"
)

func main() {
 endpoints := []string{"localhost:8080"}
 raftClient, err := client.NewRaftLockClientBuilder(endpoints).Build()
 if err != nil {
  log.Fatalf("Failed to create client: %v", err)
 }
 defer raftClient.Close()

 lockID := "my-resource-lock"
 clientID := "my-application-instance-1"

 // Create a new handle for the lock.
 lockHandle, err := client.NewLockHandle(raftClient, lockID, clientID)
 if err != nil {
  log.Fatalf("Failed to create lock handle: %v", err)
 }
 defer lockHandle.Close(context.Background())

 // Acquire the lock with a 30-second TTL.
 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 defer cancel()

 if err := lockHandle.Acquire(ctx, 30*time.Second, true); err != nil {
  log.Fatalf("Failed to acquire lock: %v", err)
 }

 fmt.Printf("Lock '%s' acquired successfully!\n", lockID)

 // ... perform operations while holding the lock ...
 time.Sleep(5 * time.Second)

 // Release the lock.
 if err := lockHandle.Release(context.Background()); err != nil {
  log.Fatalf("Failed to release lock: %v", err)
 }

 fmt.Printf("Lock '%s' released successfully!\n", lockID)
}

Configuration

The client's behavior can be customized through the Config struct, which is typically configured using the RaftLockClientBuilder.

Core Configuration Options
  • Endpoints: A list of RaftLock server addresses.
  • DialTimeout: The maximum time to wait for a connection to be established.
  • RequestTimeout: The default timeout for individual gRPC requests.
  • KeepAlive: gRPC keep-alive settings to detect dead connections.
  • EnableMetrics: A boolean to enable or disable client-side metrics collection.
  • MaxMessageSize: The maximum size of a gRPC message the client can send or receive.
Retry Policy

The RetryPolicy struct allows you to define how the client should handle failed operations.

  • MaxRetries: The maximum number of retry attempts.
  • InitialBackoff: The initial delay before the first retry.
  • MaxBackoff: The maximum delay between retries.
  • BackoffMultiplier: The multiplier for increasing the backoff duration after each attempt.
  • JitterFactor: A factor to randomize backoff durations to avoid thundering herd scenarios.
  • RetryableErrors: A list of error codes that should trigger a retry.

Error Handling

The client package defines a set of standard errors that can be returned by its functions. This allows you to handle specific error conditions in your application.

Some of the common errors include:

  • ErrLockHeld: The lock is already held by another client.
  • ErrNotLockOwner: The client is not the owner of the lock.
  • ErrVersionMismatch: The provided lock version is incorrect.
  • ErrLockNotFound: The specified lock does not exist.
  • ErrTimeout: The operation timed out.
  • ErrLeaderUnavailable: No leader is available in the cluster.
  • ErrClientClosed: The client has been closed and cannot be used.

You can use standard Go error handling techniques, such as errors.Is, to check for these specific errors.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// NewLockHandleFunc is a function variable that allows for easy mocking in tests.
	NewLockHandleFunc = func(client RaftLockClient, lockID, clientID string) (LockHandle, error) {
		return NewLockHandle(client, lockID, clientID)
	}

	// NewAutoRenewerFunc is a function variable for mocking the auto-renewer.
	NewAutoRenewerFunc = func(handle LockHandle, interval, ttl time.Duration, opts ...AutoRenewerOption) (AutoRenewer, error) {
		return NewAutoRenewer(handle, interval, ttl, opts...)
	}
)
View Source
var (
	// ErrLockHeld is returned when a lock is already held by another client
	ErrLockHeld = errors.New("lock is already held by another client")

	// ErrNotLockOwner is returned when attempting to operate on a lock not owned by the client
	ErrNotLockOwner = errors.New("client does not own the lock")

	// ErrVersionMismatch is returned when the provided version doesn't match the current lock version
	ErrVersionMismatch = errors.New("lock version mismatch")

	// ErrLockNotFound is returned when the specified lock does not exist
	ErrLockNotFound = errors.New("lock not found")

	// ErrInvalidTTL is returned when an invalid TTL value is provided
	ErrInvalidTTL = errors.New("invalid TTL value")

	// ErrWaitQueueFull is returned when the wait queue has reached capacity
	ErrWaitQueueFull = errors.New("wait queue is full")

	// ErrNotWaiting is returned when trying to cancel a wait for a client not in the queue
	ErrNotWaiting = errors.New("client is not waiting in the queue")

	// ErrTimeout is returned when an operation times out
	ErrTimeout = errors.New("operation timed out")

	// ErrLeaderUnavailable is returned when no leader is available
	ErrLeaderUnavailable = errors.New("no leader available")

	// ErrInvalidArgument is returned when request parameters are invalid
	ErrInvalidArgument = errors.New("invalid argument")

	// ErrUnavailable is returned when the service is unavailable
	ErrUnavailable = errors.New("service unavailable")

	// ErrRateLimit is returned when the request is rate limited
	ErrRateLimit = errors.New("request rate limited")

	// ErrClientClosed is returned when attempting to use a closed client
	ErrClientClosed = errors.New("client is closed")
)

Common client errors

Functions

func DoWithLock

func DoWithLock(
	ctx context.Context,
	client RaftLockClient,
	lockID, clientID string,
	ttl time.Duration,
	fn func(ctx context.Context) error,
) (err error)

DoWithLock acquires a lock, executes a function, and then releases the lock.

func ErrorFromCode

func ErrorFromCode(code pb.ErrorCode) error

ErrorFromCode converts a protobuf error code to a Go error.

func RunWithLock

func RunWithLock(
	ctx context.Context,
	client RaftLockClient,
	lockID, clientID string,
	ttl, interval time.Duration,
	fn func(context.Context) error,
) (err error)

RunWithLock is similar to DoWithLock but also sets up an auto-renewer.

Types

type AcquireRequest

type AcquireRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// ClientID is the unique identifier for the requesting client
	ClientID string

	// TTL is the time-to-live for the lock. If zero, server default is used.
	TTL time.Duration

	// Wait indicates whether to wait in queue if lock is currently held
	Wait bool

	// WaitTimeout is the maximum time to wait in queue. If zero, server default is used.
	WaitTimeout time.Duration

	// Priority in wait queue (higher numbers = higher priority, 0 = default)
	Priority int32

	// Metadata is optional key-value data to associate with the lock
	Metadata map[string]string

	// RequestID is an optional client-generated ID for idempotency
	RequestID string
}

AcquireRequest contains parameters for acquiring a distributed lock.

type AcquireResult

type AcquireResult struct {
	// Acquired indicates whether the lock was successfully acquired
	Acquired bool

	// Lock contains lock information if acquired=true
	Lock *Lock

	// BackoffAdvice provides retry guidance if acquired=false and not waiting
	BackoffAdvice *BackoffAdvice

	// QueuePosition indicates position in wait queue if waiting
	QueuePosition int32

	// EstimatedWaitDuration provides estimated wait time if enqueued
	EstimatedWaitDuration time.Duration

	// Error contains detailed error information if the operation failed
	Error *ErrorDetail
}

AcquireResult contains the outcome of a lock acquisition attempt.

type AdminClient

type AdminClient interface {
	// GetStatus returns the current status of the RaftLock cluster node.
	//
	// Useful for monitoring and diagnostics.
	//
	// Returns:
	//   - ClusterStatus with Raft and lock manager state
	//   - Error if the request fails
	GetStatus(ctx context.Context, req *GetStatusRequest) (*ClusterStatus, error)

	// Health checks the health of the RaftLock service.
	//
	// Returns:
	//   - HealthStatus indicating current health
	//   - Error if the service is unhealthy or unreachable
	Health(ctx context.Context, req *HealthRequest) (*HealthStatus, error)

	// GetBackoffAdvice returns adaptive backoff parameters to guide retry behavior
	// during lock contention.
	//
	// Returns:
	//   - BackoffAdvice with recommended delay settings
	//   - Error if the request fails
	GetBackoffAdvice(ctx context.Context, req *BackoffAdviceRequest) (*BackoffAdvice, error)

	// Close shuts down the client and releases associated resources.
	Close() error
}

AdminClient provides administrative and monitoring capabilities for RaftLock clusters. Intended for use in tools, dashboards, and scripts—not for client applications performing locking.

func NewAdminClient

func NewAdminClient(config Config) (AdminClient, error)

NewAdminClient creates a new AdminClient for administrative and monitoring tasks. It validates the provided config and initializes the underlying base client.

type AdvancedClient

type AdvancedClient interface {
	// EnqueueWaiter explicitly adds the client to a lock's wait queue.
	//
	// Most applications should use Acquire with wait=true instead.
	//
	// Returns:
	//   - EnqueueResult with queue position and estimated wait time
	//   - Error if the operation fails
	//
	// Possible errors:
	//   - ErrWaitQueueFull: the wait queue is at capacity
	EnqueueWaiter(ctx context.Context, req *EnqueueWaiterRequest) (*EnqueueResult, error)

	// CancelWait removes the client from a lock's wait queue.
	//
	// Most applications should rely on context cancellation instead.
	//
	// Returns:
	//   - CancelWaitResult indicating cancellation success
	//   - Error if the operation fails
	//
	// Possible errors:
	//   - ErrNotWaiting: client is not in the queue
	CancelWait(ctx context.Context, req *CancelWaitRequest) (*CancelWaitResult, error)

	// GetLeaderAddress returns the current leader's address, or an empty string if unknown.
	GetLeaderAddress() string

	// IsConnected reports whether the client has an active connection to the cluster.
	IsConnected() bool

	// SetRetryPolicy sets the client's retry behavior for failed operations.
	//
	// If not set, a default policy is used.
	SetRetryPolicy(policy RetryPolicy)

	// GetMetrics returns client-side metrics for observability.
	//
	// Returns nil if metrics collection is disabled.
	GetMetrics() Metrics

	// Close shuts down the client and releases resources.
	Close() error
}

AdvancedClient provides low-level operations for advanced RaftLock use cases. These methods expose finer control over lock queuing and client behavior.

func NewAdvancedClient

func NewAdvancedClient(config Config) (AdvancedClient, error)

NewAdvancedClient creates a new AdvancedClient for low-level lock operations. It sets up connection handling, retry logic, and metrics using the provided config.

type AutoRenewer

type AutoRenewer interface {
	// Start begins the auto-renewal process in a background goroutine.
	// The provided context is used to control the lifecycle of the renewal process.
	Start(ctx context.Context)

	// Stop gracefully stops the auto-renewal loop and waits for it to exit.
	// Returns any terminal error from the renewal process or shutdown.
	Stop(ctx context.Context) error

	// Done returns a channel that's closed when the auto-renewer has stopped.
	Done() <-chan struct{}

	// Err returns the error that caused the renewer to stop, if any.
	// Returns nil if stopped gracefully.
	Err() error
}

AutoRenewer defines a background mechanism for automatically renewing a lock. It handles lifecycle management, graceful shutdown, and error reporting.

func NewAutoRenewer

func NewAutoRenewer(
	handle LockHandle,
	interval, ttl time.Duration,
	opts ...AutoRenewerOption,
) (AutoRenewer, error)

NewAutoRenewer creates a new AutoRenewer for the given lock handle. It attempts to renew the lock at the specified interval using the given TTL. Optional configurations (e.g., custom clock for testing) can be provided via opts.

type AutoRenewerOption

type AutoRenewerOption func(*AutoRenewerOptions)

AutoRenewerOption is a function that applies a configuration option to an AutoRenewer.

func WithClock

func WithClock(clock raft.Clock) AutoRenewerOption

WithClock provides a custom clock implementation to the AutoRenewer, which is primarily useful for testing time-dependent behavior.

type AutoRenewerOptions

type AutoRenewerOptions struct {
	Clock raft.Clock
}

AutoRenewerOptions holds optional configuration for an AutoRenewer.

type BackoffAdvice

type BackoffAdvice struct {
	// InitialBackoff is the recommended starting backoff duration
	InitialBackoff time.Duration

	// MaxBackoff is the maximum backoff duration
	MaxBackoff time.Duration

	// Multiplier determines how backoff increases after each failed attempt
	Multiplier float64

	// JitterFactor introduces randomness to reduce contention
	JitterFactor float64
}

BackoffAdvice provides guidance for handling lock contention.

type BackoffAdviceRequest

type BackoffAdviceRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string
}

BackoffAdviceRequest contains parameters for requesting backoff guidance.

type CancelWaitRequest

type CancelWaitRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// ClientID is the unique identifier for the client
	ClientID string

	// Version is the fencing token for the wait request to cancel
	Version int64
}

CancelWaitRequest contains parameters for leaving a lock's wait queue.

type CancelWaitResult

type CancelWaitResult struct {
	// Cancelled indicates whether the wait was successfully cancelled
	Cancelled bool

	// Error contains detailed error information if the operation failed
	Error *ErrorDetail
}

CancelWaitResult contains the outcome of leaving a wait queue.

type ClientError

type ClientError struct {
	Op      string            // Operation that failed
	Err     error             // Underlying error
	Code    pb.ErrorCode      // Error code from server
	Details map[string]string // Additional error details
}

ClientError wraps an error with additional client context.

func NewClientError

func NewClientError(
	op string,
	err error,
	code pb.ErrorCode,
	details map[string]string,
) *ClientError

NewClientError creates a new ClientError.

func (*ClientError) Error

func (e *ClientError) Error() string

Error implements the error interface.

func (*ClientError) Is

func (e *ClientError) Is(target error) bool

Is checks if the error matches the target error.

func (*ClientError) Unwrap

func (e *ClientError) Unwrap() error

Unwrap returns the underlying error.

type ClusterStatus

type ClusterStatus struct {
	// RaftStatus contains current Raft cluster status
	RaftStatus *RaftStatus

	// LockStats contains lock manager statistics
	LockStats *LockManagerStats

	// Health contains server health information
	Health *HealthStatus
}

ClusterStatus contains information about the RaftLock cluster state.

type Config

type Config struct {
	// Endpoints is a list of RaftLock server addresses that the client will
	// attempt to connect to. At least one endpoint is required.
	Endpoints []string

	// DialTimeout is the maximum time the client will wait to establish a
	// connection to a server endpoint. Defaults to 5 seconds.
	DialTimeout time.Duration

	// RequestTimeout is the default timeout for individual gRPC requests.
	// This can be overridden by a context with a shorter deadline. Defaults to 30 seconds.
	RequestTimeout time.Duration

	// KeepAlive settings control gRPC's keepalive mechanism, which helps
	// detect dead connections and keep active ones alive through proxies.
	KeepAlive KeepAliveConfig

	// RetryPolicy defines the behavior for retrying failed operations,
	// including backoff strategy and which errors are considered retryable.
	RetryPolicy RetryPolicy

	// EnableMetrics toggles the collection of client-side performance metrics.
	// Defaults to true.
	EnableMetrics bool

	// MaxMessageSize specifies the maximum size of a gRPC message (in bytes)
	// that the client can send or receive. Defaults to 16MB.
	MaxMessageSize int
}

Config holds configuration options for RaftLock clients.

func DefaultClientConfig

func DefaultClientConfig() Config

DefaultClientConfig returns a ClientConfig with sensible default values.

type EnqueueResult

type EnqueueResult struct {
	// Enqueued indicates whether the client was successfully enqueued
	Enqueued bool

	// Position is the position in the wait queue (0 = front of queue)
	Position int32

	// EstimatedWaitDuration is the estimated wait time
	EstimatedWaitDuration time.Duration

	// Error contains detailed error information if the operation failed
	Error *ErrorDetail
}

EnqueueResult contains the outcome of joining a wait queue.

type EnqueueWaiterRequest

type EnqueueWaiterRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// ClientID is the unique identifier for the client
	ClientID string

	// Timeout is the maximum time to wait in queue
	Timeout time.Duration

	// Priority in wait queue (higher numbers = higher priority)
	Priority int32

	// Version is the fencing token for this wait request
	Version int64
}

EnqueueWaiterRequest contains parameters for joining a lock's wait queue.

type ErrorDetail

type ErrorDetail struct {
	// Code is the machine-readable error code
	Code pb.ErrorCode

	// Message is a human-readable description of the error
	Message string

	// Details contains additional context for debugging
	Details map[string]string
}

ErrorDetail contains structured error information.

type GetLockInfoRequest

type GetLockInfoRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// IncludeWaiters indicates whether to include detailed waiter information
	IncludeWaiters bool
}

GetLockInfoRequest contains parameters for retrieving lock information.

type GetLocksRequest

type GetLocksRequest struct {
	// Filter specifies criteria for locks to return
	Filter *LockFilter

	// Limit is the maximum number of locks to return (0 = no limit)
	Limit int32

	// Offset is the number of locks to skip (for pagination)
	Offset int32

	// IncludeWaiters indicates whether to include detailed waiter information
	IncludeWaiters bool
}

GetLocksRequest contains parameters for retrieving multiple locks.

type GetLocksResult

type GetLocksResult struct {
	// Locks is the list of locks matching the filter
	Locks []*LockInfo

	// TotalMatching is the total number of locks matching the filter
	TotalMatching int32

	// HasMore indicates if there are more results available
	HasMore bool
}

GetLocksResult contains the outcome of a multi-lock query.

type GetStatusRequest

type GetStatusRequest struct {
	// IncludeReplicationDetails indicates whether to include detailed replication info
	IncludeReplicationDetails bool
}

GetStatusRequest contains parameters for requesting cluster status.

type HealthRequest

type HealthRequest struct {
	// ServiceName is an optional service name for specific health checks
	ServiceName string
}

HealthRequest contains parameters for health check requests.

type HealthServingStatus

type HealthServingStatus int

HealthServingStatus represents the health serving status.

const (
	// HealthStatusUnknown indicates status cannot be determined
	HealthStatusUnknown HealthServingStatus = iota
	// HealthStatusServing indicates server is healthy and handling requests
	HealthStatusServing
	// HealthStatusNotServing indicates server is not handling requests or is unhealthy
	HealthStatusNotServing
)

type HealthStatus

type HealthStatus struct {
	// Status is the overall serving status
	Status HealthServingStatus

	// Message is a human-readable status message
	Message string

	// IsRaftLeader indicates whether this node believes it is the Raft leader
	IsRaftLeader bool

	// RaftLeaderAddress is the address of the current Raft leader (if known)
	RaftLeaderAddress string

	// RaftTerm is the current Raft term as seen by this node
	RaftTerm uint64

	// RaftLastApplied is the last log index applied by this node's state machine
	RaftLastApplied uint64

	// CurrentActiveLocks is the number of currently held locks on this node
	CurrentActiveLocks int32

	// CurrentTotalWaiters is the total number of clients in wait queues on this node
	CurrentTotalWaiters int32

	// Uptime is the server uptime duration
	Uptime time.Duration

	// LastHealthCheckAt is the timestamp of the last health check
	LastHealthCheckAt time.Time
}

HealthStatus contains health information about the service.

type KeepAliveConfig

type KeepAliveConfig struct {
	// Time is the interval at which the client sends keepalive pings to the server
	// when no other messages are being sent.
	Time time.Duration

	// Timeout is the duration the client waits for a keepalive ack from the server
	// before considering the connection to be dead.
	Timeout time.Duration

	// PermitWithoutStream allows keepalive pings to be sent even when there are
	// no active streams. This is useful for maintaining connections.
	PermitWithoutStream bool
}

KeepAliveConfig defines gRPC keepalive settings for the client.

type Lock

type Lock struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// OwnerID is the unique identifier for the lock owner
	OwnerID string

	// Version is the fencing token for this lock acquisition
	Version int64

	// AcquiredAt is when the lock was acquired
	AcquiredAt time.Time

	// ExpiresAt is when the lock will expire if not renewed
	ExpiresAt time.Time

	// Metadata contains optional key-value data associated with the lock
	Metadata map[string]string
}

Lock represents an acquired distributed lock.

type LockFilter

type LockFilter struct {
	// LockIDPattern is a pattern to match lock IDs (supports wildcards)
	LockIDPattern string

	// OwnerIDPattern is a pattern to match owner IDs (supports wildcards)
	OwnerIDPattern string

	// OnlyHeld indicates to return only locks that are currently held
	OnlyHeld bool

	// OnlyContested indicates to return only locks with waiters
	OnlyContested bool

	// ExpiresBefore filters locks that expire before this time
	ExpiresBefore *time.Time

	// ExpiresAfter filters locks that expire after this time
	ExpiresAfter *time.Time

	// MetadataFilter contains key-value pairs that must be present
	MetadataFilter map[string]string
}

LockFilter specifies criteria for filtering locks in queries.

type LockHandle

type LockHandle interface {
	// Acquire attempts to acquire the lock with the specified TTL.
	// If `wait` is true, the client may be enqueued if the lock is held.
	// Returns an error if the lock cannot be acquired (e.g., ErrLockHeld, ErrClientClosed).
	Acquire(ctx context.Context, ttl time.Duration, wait bool) error

	// Release releases the lock if it is currently held by this handle.
	// It uses the fencing token from the last successful acquire or renew operation.
	// Returns an error if the release fails (e.g., ErrNotLockOwner, ErrClientClosed).
	Release(ctx context.Context) error

	// Renew extends the TTL of the currently held lock.
	// Returns an error if the lock is not held or the renewal fails.
	Renew(ctx context.Context, newTTL time.Duration) error

	// IsHeld returns true if the lock is currently considered held by this handle.
	IsHeld() bool

	// Lock returns a copy of the current lock information if held, or nil otherwise.
	// The returned Lock struct is a snapshot and should not be modified.
	Lock() *Lock

	// Close releases the lock if held and marks the handle as closed, preventing further operations.
	// It is safe to call Close multiple times.
	Close(ctx context.Context) error
}

LockHandle provides a convenient, stateful wrapper for managing the lifecycle of a single distributed lock. It simplifies the process of acquiring, renewing, and releasing a specific lock by maintaining the lock's state (like its version) internally. All methods are thread-safe.

func NewLockHandle

func NewLockHandle(client RaftLockClient, lockID, clientID string) (LockHandle, error)

NewLockHandle creates a new LockHandle that can be used to manage a single lock. It requires a RaftLockClient instance and identifiers for the lock and the client.

type LockInfo

type LockInfo struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// OwnerID is the unique identifier for the current owner (empty if not held)
	OwnerID string

	// Version is the fencing token for the current lock acquisition
	Version int64

	// AcquiredAt is when the lock was acquired
	AcquiredAt time.Time

	// ExpiresAt is when the lock will expire if not renewed
	ExpiresAt time.Time

	// WaiterCount is the number of clients waiting for this lock
	WaiterCount int32

	// WaitersInfo contains detailed information about waiters (if requested)
	WaitersInfo []*WaiterInfo

	// Metadata contains optional key-value data associated with the lock
	Metadata map[string]string

	// LastModifiedAt is when the lock state was last modified
	LastModifiedAt time.Time
}

LockInfo contains detailed information about a lock.

type LockManagerStats

type LockManagerStats struct {
	// ActiveLocksCount is the total number of locks currently held
	ActiveLocksCount int32

	// TotalWaitersCount is the total number of clients in all wait queues
	TotalWaitersCount int32

	// ContestedLocksCount is the number of locks that currently have waiters
	ContestedLocksCount int32

	// AverageHoldDuration is the average lock hold time
	AverageHoldDuration time.Duration

	// AcquisitionRatePerSecond is the rate of lock acquisitions
	AcquisitionRatePerSecond float64

	// ReleaseRatePerSecond is the rate of lock releases
	ReleaseRatePerSecond float64

	// ExpiredLocksLastPeriod is the number of locks that expired recently
	ExpiredLocksLastPeriod int32
}

LockManagerStats contains statistics about the lock manager.

type Metrics

type Metrics interface {
	// IncrSuccess increments the success count for the given operation.
	IncrSuccess(operation string)

	// IncrFailure increments the failure count for the given operation.
	IncrFailure(operation string)

	// IncrRetry increments the retry count for the given operation.
	IncrRetry(operation string)

	// IncrLeaderRedirect increments the count of leader redirection events.
	IncrLeaderRedirect()

	// ObserveLatency records the latency for the given operation.
	ObserveLatency(operation string, d time.Duration)

	// SetConnectionCount sets the current number of active connections.
	SetConnectionCount(count int)

	// GetRequestCount returns the total number of requests for the given operation.
	GetRequestCount(operation string) uint64

	// GetSuccessCount returns the total number of successful requests for the given operation.
	GetSuccessCount(operation string) uint64

	// GetFailureCount returns the total number of failed requests for the given operation.
	GetFailureCount(operation string) uint64

	// GetRetryCount returns the total number of retries for the given operation.
	GetRetryCount(operation string) uint64

	// GetSuccessRate returns the success rate for the given operation as a fraction.
	GetSuccessRate(operation string) float64

	// GetAverageLatency returns the average latency for the given operation.
	GetAverageLatency(operation string) time.Duration

	// GetMaxLatency returns the maximum observed latency for the given operation.
	GetMaxLatency(operation string) time.Duration

	// GetLeaderRedirectCount returns the total number of leader redirections.
	GetLeaderRedirectCount() uint64

	// GetConnectionCount returns the current number of active connections.
	GetConnectionCount() int

	// Reset clears all collected metrics.
	Reset()

	// Snapshot returns a snapshot of the current metric values.
	Snapshot() MetricsSnapshot
}

Metrics defines an interface for collecting client-side operational metrics.

type MetricsSnapshot

type MetricsSnapshot struct {
	// Operations holds metrics for each named operation (e.g., "Acquire", "Release").
	Operations map[string]OperationMetrics

	// LeaderRedirects is the total number of leader redirection events.
	LeaderRedirects uint64

	// Connections is the current number of active client connections.
	Connections int
}

MetricsSnapshot represents a point-in-time view of client metrics.

type OperationMetrics

type OperationMetrics struct {
	// Requests is the total number of times the operation was attempted.
	Requests uint64

	// Successes is the number of successful attempts.
	Successes uint64

	// Failures is the number of failed attempts.
	Failures uint64

	// Retries is the number of retry attempts.
	Retries uint64

	// Latency is the cumulative latency across all attempts.
	Latency time.Duration

	// MaxLatency is the longest single observed latency.
	MaxLatency time.Duration
}

OperationMetrics holds counters and latency data for a specific client operation.

type PeerState

type PeerState struct {
	// NextIndex is the next index to send to this peer
	NextIndex uint64

	// MatchIndex is the highest log entry known to be replicated on this peer
	MatchIndex uint64

	// IsActive indicates whether the peer is currently responding
	IsActive bool

	// LastActiveAt is when the peer was last known to be active
	LastActiveAt time.Time

	// SnapshotInProgress indicates whether snapshot transfer is in progress
	SnapshotInProgress bool

	// ReplicationLag is how far behind this peer is
	ReplicationLag uint64
}

PeerState contains replication state for a single peer.

type RaftLockClient

type RaftLockClient interface {
	// Acquire attempts to acquire a distributed lock using the given parameters.
	// If the lock is already held and wait=true, the client is enqueued.
	//
	// Returns:
	//   - AcquireResult with lock details and status
	//   - Error if the operation fails or times out
	//
	// Possible errors:
	//   - ErrLockHeld: lock is held by another client (wait=false)
	//   - ErrTimeout: operation exceeded timeout
	//   - ErrLeaderUnavailable: no available leader
	//   - ErrInvalidArgument: invalid input parameters
	Acquire(ctx context.Context, req *AcquireRequest) (*AcquireResult, error)

	// Release releases a lock previously acquired by the client, using the given version token.
	//
	// Returns:
	//   - ReleaseResult indicating success and whether a waiter was promoted
	//   - Error if the operation fails
	//
	// Possible errors:
	//   - ErrNotLockOwner: caller is not the lock owner
	//   - ErrVersionMismatch: version does not match current lock version
	//   - ErrLockNotFound: lock does not exist
	Release(ctx context.Context, req *ReleaseRequest) (*ReleaseResult, error)

	// Renew extends the TTL of a lock held by the client.
	//
	// Returns:
	//   - RenewResult with updated lock information
	//   - Error if the operation fails
	//
	// Possible errors:
	//   - ErrNotLockOwner: caller is not the lock owner
	//   - ErrVersionMismatch: version does not match current lock version
	//   - ErrInvalidTTL: TTL value is invalid
	Renew(ctx context.Context, req *RenewRequest) (*RenewResult, error)

	// GetLockInfo returns metadata and state for a given lock without acquiring it.
	//
	// Returns:
	//   - LockInfo with lock state and metadata
	//   - Error if the operation fails
	//
	// Uses linearizable reads for consistency.
	GetLockInfo(ctx context.Context, req *GetLockInfoRequest) (*LockInfo, error)

	// GetLocks returns a paginated list of locks matching the specified filter.
	//
	// Returns:
	//   - GetLocksResult with matching locks and pagination data
	//   - Error if the operation fails
	//
	GetLocks(ctx context.Context, req *GetLocksRequest) (*GetLocksResult, error)

	// Close shuts down the client, releasing all resources and closing connections.
	// The client must not be used after Close is called.
	Close() error
}

RaftLockClient defines a high-level client for interacting with a RaftLock cluster. It abstracts gRPC communication and provides methods for distributed lock operations with support for retries, leader redirection, and backoff.

All operations are context-aware and honor cancellation and timeouts. The client handles topology changes and reconnects to new leaders automatically.

func NewRaftLockClient

func NewRaftLockClient(config Config) (RaftLockClient, error)

NewRaftLockClient returns a new RaftLock client configured to interact with a cluster. It establishes internal connection handling, metrics, and retry logic.

type RaftLockClientBuilder

type RaftLockClientBuilder struct {
	// contains filtered or unexported fields
}

RaftLockClientBuilder provides a fluent API for constructing RaftLock clients. It supports configuring and creating RaftLockClient, AdminClient, and AdvancedClient instances.

Example:

client, err := client.NewRaftLockClientBuilder([]string{"localhost:8080"}).
    WithTimeouts(2*time.Second, 5*time.Second).
    Build()

func NewRaftLockClientBuilder

func NewRaftLockClientBuilder(endpoints []string) *RaftLockClientBuilder

NewRaftLockClientBuilder returns a new Builder initialized with the given endpoints. At least one endpoint is required to build a client.

func (*RaftLockClientBuilder) Build

Build returns a configured RaftLockClient.

func (*RaftLockClientBuilder) BuildAdmin

func (b *RaftLockClientBuilder) BuildAdmin() (AdminClient, error)

BuildAdmin returns a configured AdminClient.

func (*RaftLockClientBuilder) BuildAdvanced

func (b *RaftLockClientBuilder) BuildAdvanced() (AdvancedClient, error)

BuildAdvanced returns a configured AdvancedClient.

func (*RaftLockClientBuilder) WithEndpoints

func (b *RaftLockClientBuilder) WithEndpoints(endpoints []string) *RaftLockClientBuilder

WithEndpoints sets the server endpoints. This is required and overrides any previously set endpoints.

func (*RaftLockClientBuilder) WithKeepAlive

func (b *RaftLockClientBuilder) WithKeepAlive(
	time, timeout time.Duration,
	permitWithoutStream bool,
) *RaftLockClientBuilder

WithKeepAlive sets gRPC keepalive parameters.

func (*RaftLockClientBuilder) WithMaxMessageSize

func (b *RaftLockClientBuilder) WithMaxMessageSize(size int) *RaftLockClientBuilder

WithMaxMessageSize sets the max gRPC message size (bytes).

func (*RaftLockClientBuilder) WithMetrics

func (b *RaftLockClientBuilder) WithMetrics(enabled bool) *RaftLockClientBuilder

WithMetrics enables or disables metrics collection.

func (*RaftLockClientBuilder) WithRetryOptions

func (b *RaftLockClientBuilder) WithRetryOptions(
	maxRetries int,
	initialBackoff, maxBackoff time.Duration,
	multiplier float64,
) *RaftLockClientBuilder

WithRetryOptions updates the default retry policy parameters.

func (*RaftLockClientBuilder) WithRetryPolicy

func (b *RaftLockClientBuilder) WithRetryPolicy(policy RetryPolicy) *RaftLockClientBuilder

WithRetryPolicy sets a custom retry policy.

func (*RaftLockClientBuilder) WithRetryableErrors

func (b *RaftLockClientBuilder) WithRetryableErrors(codes ...pb.ErrorCode) *RaftLockClientBuilder

WithRetryableErrors sets the error codes that should trigger retries. An empty slice disables retryable error codes.

func (*RaftLockClientBuilder) WithTimeouts

func (b *RaftLockClientBuilder) WithTimeouts(
	dialTimeout, requestTimeout time.Duration,
) *RaftLockClientBuilder

WithTimeouts sets the dial and request timeouts.

type RaftStatus

type RaftStatus struct {
	// NodeID is this node's identifier
	NodeID string

	// Role is the current node role (Leader, Follower, Candidate)
	Role string

	// Term is the current Raft term
	Term uint64

	// LeaderID is the current leader's identifier (empty if unknown)
	LeaderID string

	// LastLogIndex is the highest log entry index
	LastLogIndex uint64

	// LastLogTerm is the term of the highest log entry
	LastLogTerm uint64

	// CommitIndex is the highest index known to be committed
	CommitIndex uint64

	// LastApplied is the highest index applied to state machine
	LastApplied uint64

	// SnapshotIndex is the index of the last snapshot
	SnapshotIndex uint64

	// SnapshotTerm is the term of the last snapshot
	SnapshotTerm uint64

	// Replication contains replication state for each peer (leaders only)
	Replication map[string]*PeerState
}

RaftStatus contains information about the Raft cluster state.

type ReleaseRequest

type ReleaseRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// ClientID is the unique identifier for the client releasing the lock
	ClientID string

	// Version is the fencing token from when the lock was acquired
	Version int64
}

ReleaseRequest contains parameters for releasing a distributed lock.

type ReleaseResult

type ReleaseResult struct {
	// Released indicates whether the lock was successfully released
	Released bool

	// WaiterPromoted indicates whether a waiter was promoted to lock holder
	WaiterPromoted bool

	// Error contains detailed error information if the operation failed
	Error *ErrorDetail
}

ReleaseResult contains the outcome of a lock release operation.

type RenewRequest

type RenewRequest struct {
	// LockID is the unique identifier for the lock resource
	LockID string

	// ClientID is the unique identifier for the client renewing the lock
	ClientID string

	// Version is the fencing token from when the lock was acquired
	Version int64

	// NewTTL is the new time-to-live for the lock
	NewTTL time.Duration
}

RenewRequest contains parameters for renewing a distributed lock.

type RenewResult

type RenewResult struct {
	// Renewed indicates whether the lock was successfully renewed
	Renewed bool

	// Lock contains updated lock information if renewed=true
	Lock *Lock

	// Error contains detailed error information if the operation failed
	Error *ErrorDetail
}

RenewResult contains the outcome of a lock renewal operation.

type RetryPolicy

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts (0 = no retries)
	MaxRetries int

	// InitialBackoff is the initial delay before the first retry
	InitialBackoff time.Duration

	// MaxBackoff is the maximum delay between retries
	MaxBackoff time.Duration

	// BackoffMultiplier determines how backoff increases between retries
	BackoffMultiplier float64

	// JitterFactor adds randomness to backoff timing (0.0 to 1.0)
	JitterFactor float64

	// RetryableErrors specifies which error codes should trigger retries
	RetryableErrors []pb.ErrorCode
}

RetryPolicy defines how the client should retry failed operations.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a default retry policy that handles common transient and leader-related errors.

type ValidationError

type ValidationError struct {
	Field   string
	Value   any
	Message string
}

ValidationError describes a configuration or input validation error.

func NewValidationError

func NewValidationError(field string, value any, message string) *ValidationError

NewValidationError returns a structured validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error returns the error message for ValidationError.

type WaiterInfo

type WaiterInfo struct {
	// ClientID is the unique identifier for the waiting client
	ClientID string

	// EnqueuedAt is when the client was added to the queue
	EnqueuedAt time.Time

	// TimeoutAt is when the client's wait request will expire
	TimeoutAt time.Time

	// Priority is the client's priority in the wait queue
	Priority int32

	// Position is the current position in the queue (0 = front)
	Position int32
}

WaiterInfo contains information about a client waiting for a lock.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL