natslock

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MIT Imports: 9 Imported by: 0

README

nlock - Distributed Locks using NATS KV

Go Report Card Go Reference

A simple Go package providing distributed locking capabilities built on top of NATS JetStream Key-Value (KV) Store. It leverages atomic KV operations and TTLs for reliable lock management across multiple application instances.

Features

  • Standard Locking: Simple Acquire/Release API with retries.
  • Execute-Once (Try-Lock): High-level Do API for running tasks only if the lock is free (perfect for clustered cron jobs).
  • Native Persistence: Uses NATS KV Store for lock state.
  • Auto Renewal: Automatic lock renewal (keep-alive) via background goroutine.
  • Context-Aware: Operations support timeouts and cancellation.
  • Configurable: Customizable TTL, retry intervals, and keep-alive frequencies.
  • Observability: Structured logging integration via log/slog.

Installation

go get github.com/akhenakh/nlock 

Status

This was generated 99% using Gemini, it is useful to me use it at your own risk ;)

Usage

Basic Locking (Acquire & Release)

Use this pattern when you need to wait for a lock to become available before proceeding.

package main

import (
	"context"
	"errors"
	"log/slog"
	"os"
	"time"

	"github.com/akhenakh/nlock" 
	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func main() {
	// 1. Connect to NATS
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	js, _ := jetstream.New(nc)
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

	// 2. Create Lock Manager
	manager, _ := nlock.NewLockManager(context.Background(), js,
		nlock.WithLogger(logger),
		nlock.WithTTL(30*time.Second),
	)

	// 3. Acquire Lock (blocks until acquired or context timeout)
	lockKey := "database-migration"
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	lock, err := manager.Acquire(ctx, lockKey)
	if err != nil {
		if errors.Is(err, nlock.ErrLockAcquireTimeout) {
			logger.Error("Could not acquire lock in time")
		}
		return
	}
	
	// 4. Ensure Release
	defer func() {
		if err := lock.Release(context.Background()); err != nil {
			logger.Error("Failed to release", "error", err)
		}
	}()

	logger.Info("Critical section: doing work...")
	time.Sleep(2 * time.Second)
}
Distributed Task Execution (Try-Lock)

Use the Do method when you have multiple replicas of a service (e.g., a clustered cron job), and you want only one instance to execute a task, while the others skip it immediately if the lock is busy.

func RunCronJob(manager *nlock.LockManager) {
    ctx := context.Background()
    lockKey := "hourly-cleanup-job"

    // Attempt to run the function. 
    // If lock is free: acquires lock -> runs func -> releases lock -> returns executed=true
    // If lock is busy: returns executed=false immediately (no waiting)
    executed, err := manager.Do(ctx, lockKey, func(ctx context.Context) error {
        
        slog.Info("I am the leader, performing cleanup...")
        
        // Simulate heavy lifting
        time.Sleep(500 * time.Millisecond)
        
        return nil
    })

    if err != nil {
        // Error acquiring lock (network issue) or error returned by the closure
        slog.Error("Job failed", "error", err)
        return
    }

    if !executed {
        slog.Info("Job skipped: Lock currently held by another instance")
    } else {
        slog.Info("Job completed successfully")
    }
}

Configuration Options

The NewLockManager function accepts functional options:

  • WithTTL(time.Duration): Sets the time-to-live for lock keys. Defaults to 30s. Locks are auto-deleted by NATS after this time if not renewed.
  • WithRetryInterval(time.Duration): Sets the interval between acquisition attempts in Acquire. Defaults to 250ms.
  • WithKeepAlive(bool): Enables (default true) automatic background renewal. If disabled, you must finish work before TTL expires.
  • WithKeepAliveIntervalFactor(int): Refresh frequency. Interval = TTL / factor. Defaults to 3.
  • WithLogger(*slog.Logger): Sets a custom slog logger.
  • WithBucketName(string): Sets the NATS KV bucket name. Defaults to "distributed_locks".
  • WithBucketReplicas(int): Sets the number of replicas for the bucket (for clustered NATS). Defaults to 1.

Error Handling

  • nlock.ErrLockAcquireTimeout: Returned by Acquire if context expires before lock is obtained.
  • nlock.ErrLockHeld: Returned internally by TryAcquire (and handled by Do) to indicate the lock is busy.
  • nlock.ErrLockNotHeld: Returned by Release if the lock expired or was lost before release was called.

Design Details

  1. Persistence: Locks are stored as keys in a NATS KV Bucket.
  2. Atomic Creation: Acquisition uses kv.Create(), which only succeeds if the key does not exist.
  3. Atomic Release: Release uses kv.Delete(..., LastRevision(rev)), ensuring you only delete the lock if you are still the owner.
  4. Failsafe: If a process crashes, the NATS KV TTL ensures the lock key is automatically removed after the configured duration, allowing other processes to eventually acquire it.

Documentation

Index

Constants

View Source
const (
	// DefaultLockTTL is the default time-to-live for a lock key.
	DefaultLockTTL = 30 * time.Second
	// DefaultRetryInterval is the default interval to wait before retrying acquisition.
	DefaultRetryInterval = 250 * time.Millisecond
	// DefaultKeepAliveIntervalFactor is the factor of TTL used for the keep-alive interval (TTL / factor).
	// A factor of 3 means the keep-alive will run at 1/3 of the TTL duration.
	DefaultKeepAliveIntervalFactor = 3
)

Variables

View Source
var (
	// ErrLockAcquireTimeout is returned when acquiring a lock times out.
	ErrLockAcquireTimeout = errors.New("lock acquisition timed out")
	// ErrLockNotHeld is returned when trying to release or refresh a lock that is not held or lost.
	ErrLockNotHeld = errors.New("lock not held or already released/expired")
	// ErrLockAlreadyLocked is a specific error type when creation fails because the key exists.
	ErrLockAlreadyLocked = errors.New("lock key already exists")
	// ErrLockHeld is returned by TryAcquire when the lock is currently held by another owner.
	ErrLockHeld = errors.New("lock already held")
)

Functions

This section is empty.

Types

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

Lock represents an acquired distributed lock.

func (*Lock) Key

func (l *Lock) Key() string

Key returns the key associated with this lock.

func (*Lock) OwnerID

func (l *Lock) OwnerID() string

OwnerID returns the unique ID of the owner who acquired this lock instance.

func (*Lock) Release

func (l *Lock) Release(ctx context.Context) error

Release attempts to release the acquired lock. It stops the keep-alive goroutine (if running) and deletes the lock key from NATS KV. It only deletes the key if the revision matches the one held by this Lock instance.

func (*Lock) Revision

func (l *Lock) Revision() uint64

Revision returns the current NATS KV revision of the lock. This can be used as a fencing token for external systems.

type LockManager

type LockManager struct {
	// contains filtered or unexported fields
}

LockManager manages distributed locks using a NATS KV bucket.

func NewLockManager

func NewLockManager(ctx context.Context, js jetstream.JetStream, opts ...Option) (*LockManager, error)

NewLockManager creates a new LockManager. It ensures the necessary NATS KV bucket exists with the configured TTL.

func (*LockManager) Acquire

func (m *LockManager) Acquire(ctx context.Context, key string) (*Lock, error)

Acquire attempts to acquire a lock for the given key. It blocks until the lock is acquired or the context is cancelled/times out. Returns the acquired Lock or an error.

func (*LockManager) Do

func (m *LockManager) Do(ctx context.Context, key string, fn func(ctx context.Context) error) (executed bool, err error)

Do executes the provided function 'fn' only if the lock for 'key' can be acquired immediately.

Use this in clustered environments where multiple replicas run the same code, but you only want one of them to execute the logic.

Returns: - executed (bool): true if the lock was acquired and fn was called, false if lock was busy. - err (error): contains errors from NATS (during acquisition) or errors returned by 'fn'.

func (*LockManager) TryAcquire

func (m *LockManager) TryAcquire(ctx context.Context, key string) (*Lock, error)

TryAcquire attempts to acquire the lock exactly once without retrying. If the lock is available, it returns the Lock object. If the lock is already held, it returns ErrLockHeld. If a network/infra error occurs, it returns that error.

type Option

type Option func(*Options)

Option is a function type for setting LockManager options.

func WithBucketDescription

func WithBucketDescription(desc string) Option

WithBucketDescription sets the description for the KV bucket.

func WithBucketName

func WithBucketName(name string) Option

WithBucketName sets the name of the NATS KV bucket used for locks. Defaults to "distributed_locks".

func WithBucketReplicas

func WithBucketReplicas(replicas int) Option

WithBucketReplicas sets the number of replicas for the lock bucket. Defaults to 1. Only relevant in clustered NATS setups.

func WithKeepAlive

func WithKeepAlive(enable bool) Option

WithKeepAlive enables or disables automatic background refresh (keep-alive) for acquired locks. Disable this if you manually manage lock refresh/release. Defaults to true.

func WithKeepAliveIntervalFactor

func WithKeepAliveIntervalFactor(factor int) Option

WithKeepAliveIntervalFactor sets the factor of the lock TTL to use for the keep-alive interval. The interval is calculated as TTL / factor. A smaller factor means more frequent refreshes. Defaults to 3. Must be 2 or greater.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets a custom slog logger. Defaults to slog.Default().

func WithRetryInterval

func WithRetryInterval(interval time.Duration) Option

WithRetryInterval sets the base interval between lock acquisition attempts. A small amount of random jitter will be applied to this interval to prevent thundering herds.

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL sets the time-to-live for lock keys. Lock keys will be automatically deleted by NATS KV after this duration if not refreshed.

type Options

type Options struct {
	TTL                     time.Duration
	RetryInterval           time.Duration
	KeepAlive               bool // Enable automatic keep-alive for acquired locks
	KeepAliveIntervalFactor int  // Factor of TTL for keep-alive interval (TTL / factor)
	Logger                  *slog.Logger
	BucketName              string
	BucketDescription       string
	BucketReplicas          int
}

Options configure the LockManager and lock acquisition.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL