kafka

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package kafka provides Kafka runtime implementations for the Humus queue framework.

This package integrates Apache Kafka with Humus's queue processing abstractions, offering at-most-once and at-least-once delivery semantics with concurrent per-partition processing using the franz-go client library.

Architecture

The package provides two runtime implementations:

  • [AtMostOnceRuntime] - Acknowledges messages before processing (fast, may lose messages)
  • [AtLeastOnceRuntime] - Acknowledges messages after processing (reliable, may duplicate)

Both runtimes use a goroutine-per-partition pattern for concurrent processing, leveraging franz-go's partition assignment callbacks. Each partition runs an independent processing loop using the core queue package's queue.ProcessAtMostOnce or queue.ProcessAtLeastOnce processors.

At-Most-Once Processing

At-most-once processing provides fast throughput by acknowledging (committing offsets) immediately after consuming messages, before processing. If processing fails, the message is already committed and will be lost.

Use cases:

  • Metrics collection and monitoring
  • Log aggregation
  • Cache updates
  • Any scenario where occasional data loss is acceptable

Example:

type MetricsMessage struct {
    Name  string
    Value float64
}

// Decode Kafka message bytes into your message type
func decodeMetrics(data []byte) (*MetricsMessage, error) {
    var msg MetricsMessage
    err := json.Unmarshal(data, &msg)
    return &msg, err
}

// Implement business logic
type MetricsProcessor struct{}

func (p *MetricsProcessor) Process(ctx context.Context, msg *MetricsMessage) error {
    // Send to monitoring system
    return nil
}

// Create runtime and run
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    processor := &MetricsProcessor{}
    runtime, err := kafka.NewAtMostOnceRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.Topic,
        cfg.Kafka.GroupID,
        processor,
        decodeMetrics,
    )
    if err != nil {
        return nil, err
    }
    return queue.NewApp(runtime), nil
}

At-Least-Once Processing

At-least-once processing provides reliable delivery by acknowledging (committing offsets) only after successful processing. If processing fails, the message is not committed and will be redelivered for retry. Your processor MUST be idempotent to handle duplicates.

Use cases:

  • Financial transactions
  • Database updates
  • Event sourcing
  • Any scenario where data loss is unacceptable

Example:

type OrderMessage struct {
    OrderID string
    Amount  float64
}

// Implement idempotent business logic
type OrderProcessor struct {
    db *sql.DB
}

func (p *OrderProcessor) Process(ctx context.Context, msg *OrderMessage) error {
    // Check if already processed (idempotency)
    var exists bool
    err := p.db.QueryRowContext(ctx,
        "SELECT EXISTS(SELECT 1 FROM orders WHERE order_id = $1)",
        msg.OrderID,
    ).Scan(&exists)
    if err != nil {
        return err
    }
    if exists {
        return nil // Already processed
    }

    // Process order
    _, err = p.db.ExecContext(ctx,
        "INSERT INTO orders (order_id, amount) VALUES ($1, $2)",
        msg.OrderID, msg.Amount,
    )
    return err
}

// Create runtime and run
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    processor := &OrderProcessor{db: cfg.DB}
    runtime, err := kafka.NewAtLeastOnceRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.Topic,
        cfg.Kafka.GroupID,
        processor,
        decodeOrder,
    )
    if err != nil {
        return nil, err
    }
    return queue.NewApp(runtime), nil
}

Message Decoding

Both runtimes accept a decoder function that converts Kafka message bytes into your application's message type. This keeps the runtime generic and allows you to use any serialization format (JSON, Protobuf, Avro, etc.).

func decodeMyMessage(data []byte) (*MyMessage, error) {
    // JSON example
    var msg MyMessage
    err := json.Unmarshal(data, &msg)
    return &msg, err

    // Protobuf example
    // var msg MyMessage
    // err := proto.Unmarshal(data, &msg)
    // return &msg, err
}

Concurrency Model

Each Kafka partition is processed concurrently in its own goroutine. When a consumer group rebalance occurs:

  • Assigned partitions spawn new processing goroutines
  • Revoked partitions gracefully shut down their goroutines
  • All goroutines coordinate through context cancellation

This provides natural parallelism and isolation, with processing throughput scaling with the number of partitions.

Graceful Shutdown

When the application context is cancelled (e.g., on SIGTERM), the runtime:

  1. Stops fetching new messages
  2. Returns queue.ErrEndOfQueue from consumers
  3. Allows in-flight messages to complete processing
  4. Commits final offsets for at-least-once semantics
  5. Closes the Kafka client

The Humus framework handles signal handling automatically.

OpenTelemetry Instrumentation

All message processing is automatically instrumented with OpenTelemetry tracing, logging, and metrics through the core queue package processors.

Tracing

Traces include:

  • Span per message with processing order visible
  • Context propagation through consume/process/acknowledge phases
  • Error recording in spans

No additional tracing configuration is needed in your processor implementation.

Metrics

The following metrics are automatically collected:

messaging.client.messages.processed - Total number of Kafka messages processed
  Labels: messaging.destination.name (topic), messaging.destination.partition.id
  Unit: {message}

messaging.client.messages.committed - Total number of Kafka messages successfully committed
  Labels: messaging.destination.name (topic), messaging.destination.partition.id
  Unit: {message}

messaging.client.processing.failures - Total number of Kafka message processing failures
  Labels: messaging.destination.name (topic), messaging.destination.partition.id, error.type
  Unit: {failure}
  Note: error.type is a generic classification ("processing_error") to avoid exposing sensitive information

These metrics help monitor:

  • Message throughput (messages processed per second)
  • Consumer lag (by comparing processed vs committed)
  • Error rates (failures per message)
  • Partition-level performance

All metrics use the OpenTelemetry meter provider configured in your application via otel.GetMeterProvider().

Configuration

The runtimes accept franz-go client options for advanced configuration:

runtime, err := kafka.NewAtLeastOnceRuntime(
    brokers,
    topic,
    groupID,
    processor,
    decoder,
    kafka.WithKafkaOptions(
        kgo.FetchMaxWait(500*time.Millisecond),
        kgo.SessionTimeout(10*time.Second),
    ),
)

See franz-go documentation for available options: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GroupIDAttr

func GroupIDAttr(groupID string) slog.Attr

GroupIDAttr returns a slog attribute for the Kafka consumer group ID.

func OffsetAttr

func OffsetAttr(offset int64) slog.Attr

OffsetAttr returns a slog attribute for the Kafka offset.

func PartitionAttr

func PartitionAttr(partition int32) slog.Attr

PartitionAttr returns a slog attribute for the Kafka partition.

func TopicAttr

func TopicAttr(topic string) slog.Attr

TopicAttr returns a slog attribute for the Kafka topic.

Types

type Header struct {
	Key   string
	Value []byte
}

Header represents a Kafka message header.

type Message

type Message struct {
	Key       []byte
	Value     []byte
	Headers   []Header
	Timestamp time.Time
	Topic     string
	Partition int32
	Offset    int64
	Attrs     uint8
}

Message represents a Kafka message.

type Option

type Option func(*Options)

Option defines a function type for configuring Kafka runtime options.

func AtLeastOnce

func AtLeastOnce(topic string, processor queue.Processor[Message]) Option

AtLeastOnce configures the Kafka runtime to process messages from the specified topic with at-least-once delivery semantics (process before acknowledge).

func AtMostOnce

func AtMostOnce(topic string, processor queue.Processor[Message]) Option

AtMostOnce configures the Kafka runtime to process messages from the specified topic using at-most-once delivery semantics.

With at-most-once semantics, messages are committed to Kafka before processing begins. This means that if processing fails, the message is lost since it has already been committed and will not be redelivered.

This approach provides:

  • Lower latency (commits happen immediately)
  • Higher throughput (no waiting for processing to complete)
  • Risk of message loss on processing failures

Use at-most-once when:

  • Message loss is acceptable
  • Performance is critical
  • Data is non-critical (metrics, logs, cache updates)

The processor will receive messages even if commit fails, but the consumer group offset will not advance, potentially causing duplicate processing on restart.

func FetchMaxBytes

func FetchMaxBytes(bytes int32) Option

FetchMaxBytes sets the maximum total bytes to buffer from fetch responses across all partitions. Default is 50 MB if not set.

func MaxConcurrentFetches

func MaxConcurrentFetches(fetches int) Option

MaxConcurrentFetches sets the maximum number of concurrent fetch requests. Default is unlimited if not set.

func RebalanceTimeout

func RebalanceTimeout(d time.Duration) Option

RebalanceTimeout sets the rebalance timeout for the Kafka consumer group.

func SessionTimeout

func SessionTimeout(d time.Duration) Option

SessionTimeout sets the session timeout for the Kafka consumer group.

func WithTLS added in v0.14.0

func WithTLS(cfg *tls.Config) Option

WithTLS configures TLS/mTLS for secure connections to Kafka brokers. Pass a fully configured *tls.Config with certificates, CA pool, and other TLS settings.

Example:

// Load certificates
cert, err := tls.LoadX509KeyPair("client-cert.pem", "client-key.pem")
if err != nil {
    return err
}

caCert, err := os.ReadFile("ca-cert.pem")
if err != nil {
    return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

tlsConfig := &tls.Config{
    Certificates: []tls.Certificate{cert},
    RootCAs:      caCertPool,
    MinVersion:   tls.VersionTLS12,
}

runtime := kafka.NewRuntime(brokers, groupID,
    kafka.WithTLS(tlsConfig),
    kafka.AtLeastOnce(topic, processor),
)

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options represents configuration options for the Kafka runtime.

type Runtime

type Runtime struct {
	// contains filtered or unexported fields
}

Runtime represents the Kafka runtime for processing messages.

func NewRuntime

func NewRuntime(
	brokers []string,
	groupID string,
	opts ...Option,
) Runtime

NewRuntime creates a new Kafka runtime with the provided brokers, group ID, and options.

func (Runtime) ProcessQueue

func (r Runtime) ProcessQueue(ctx context.Context) error

ProcessQueue starts processing the Kafka queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL