Documentation
¶
Overview ¶
Package kafka provides Kafka runtime implementations for the Humus queue framework.
This package integrates Apache Kafka with Humus's queue processing abstractions, offering at-most-once and at-least-once delivery semantics with concurrent per-partition processing using the franz-go client library.
Architecture ¶
The package provides two runtime implementations:
- [AtMostOnceRuntime] - Acknowledges messages before processing (fast, may lose messages)
- [AtLeastOnceRuntime] - Acknowledges messages after processing (reliable, may duplicate)
Both runtimes use a goroutine-per-partition pattern for concurrent processing, leveraging franz-go's partition assignment callbacks. Each partition runs an independent processing loop using the core queue package's queue.ProcessAtMostOnce or queue.ProcessAtLeastOnce processors.
At-Most-Once Processing ¶
At-most-once processing provides fast throughput by acknowledging (committing offsets) immediately after consuming messages, before processing. If processing fails, the message is already committed and will be lost.
Use cases:
- Metrics collection and monitoring
- Log aggregation
- Cache updates
- Any scenario where occasional data loss is acceptable
Example:
type MetricsMessage struct {
Name string
Value float64
}
// Decode Kafka message bytes into your message type
func decodeMetrics(data []byte) (*MetricsMessage, error) {
var msg MetricsMessage
err := json.Unmarshal(data, &msg)
return &msg, err
}
// Implement business logic
type MetricsProcessor struct{}
func (p *MetricsProcessor) Process(ctx context.Context, msg *MetricsMessage) error {
// Send to monitoring system
return nil
}
// Create runtime and run
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
processor := &MetricsProcessor{}
runtime, err := kafka.NewAtMostOnceRuntime(
cfg.Kafka.Brokers,
cfg.Kafka.Topic,
cfg.Kafka.GroupID,
processor,
decodeMetrics,
)
if err != nil {
return nil, err
}
return queue.NewApp(runtime), nil
}
At-Least-Once Processing ¶
At-least-once processing provides reliable delivery by acknowledging (committing offsets) only after successful processing. If processing fails, the message is not committed and will be redelivered for retry. Your processor MUST be idempotent to handle duplicates.
Use cases:
- Financial transactions
- Database updates
- Event sourcing
- Any scenario where data loss is unacceptable
Example:
type OrderMessage struct {
OrderID string
Amount float64
}
// Implement idempotent business logic
type OrderProcessor struct {
db *sql.DB
}
func (p *OrderProcessor) Process(ctx context.Context, msg *OrderMessage) error {
// Check if already processed (idempotency)
var exists bool
err := p.db.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM orders WHERE order_id = $1)",
msg.OrderID,
).Scan(&exists)
if err != nil {
return err
}
if exists {
return nil // Already processed
}
// Process order
_, err = p.db.ExecContext(ctx,
"INSERT INTO orders (order_id, amount) VALUES ($1, $2)",
msg.OrderID, msg.Amount,
)
return err
}
// Create runtime and run
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
processor := &OrderProcessor{db: cfg.DB}
runtime, err := kafka.NewAtLeastOnceRuntime(
cfg.Kafka.Brokers,
cfg.Kafka.Topic,
cfg.Kafka.GroupID,
processor,
decodeOrder,
)
if err != nil {
return nil, err
}
return queue.NewApp(runtime), nil
}
Message Decoding ¶
Both runtimes accept a decoder function that converts Kafka message bytes into your application's message type. This keeps the runtime generic and allows you to use any serialization format (JSON, Protobuf, Avro, etc.).
func decodeMyMessage(data []byte) (*MyMessage, error) {
// JSON example
var msg MyMessage
err := json.Unmarshal(data, &msg)
return &msg, err
// Protobuf example
// var msg MyMessage
// err := proto.Unmarshal(data, &msg)
// return &msg, err
}
Concurrency Model ¶
Each Kafka partition is processed concurrently in its own goroutine. When a consumer group rebalance occurs:
- Assigned partitions spawn new processing goroutines
- Revoked partitions gracefully shut down their goroutines
- All goroutines coordinate through context cancellation
This provides natural parallelism and isolation, with processing throughput scaling with the number of partitions.
Graceful Shutdown ¶
When the application context is cancelled (e.g., on SIGTERM), the runtime:
- Stops fetching new messages
- Returns queue.ErrEndOfQueue from consumers
- Allows in-flight messages to complete processing
- Commits final offsets for at-least-once semantics
- Closes the Kafka client
The Humus framework handles signal handling automatically.
OpenTelemetry Instrumentation ¶
All message processing is automatically instrumented with OpenTelemetry tracing, logging, and metrics through the core queue package processors.
Tracing ¶
Traces include:
- Span per message with processing order visible
- Context propagation through consume/process/acknowledge phases
- Error recording in spans
No additional tracing configuration is needed in your processor implementation.
Metrics ¶
The following metrics are automatically collected:
messaging.client.messages.processed - Total number of Kafka messages processed
Labels: messaging.destination.name (topic), messaging.destination.partition.id
Unit: {message}
messaging.client.messages.committed - Total number of Kafka messages successfully committed
Labels: messaging.destination.name (topic), messaging.destination.partition.id
Unit: {message}
messaging.client.processing.failures - Total number of Kafka message processing failures
Labels: messaging.destination.name (topic), messaging.destination.partition.id, error.type
Unit: {failure}
Note: error.type is a generic classification ("processing_error") to avoid exposing sensitive information
These metrics help monitor:
- Message throughput (messages processed per second)
- Consumer lag (by comparing processed vs committed)
- Error rates (failures per message)
- Partition-level performance
All metrics use the OpenTelemetry meter provider configured in your application via otel.GetMeterProvider().
Configuration ¶
The runtimes accept franz-go client options for advanced configuration:
runtime, err := kafka.NewAtLeastOnceRuntime(
brokers,
topic,
groupID,
processor,
decoder,
kafka.WithKafkaOptions(
kgo.FetchMaxWait(500*time.Millisecond),
kgo.SessionTimeout(10*time.Second),
),
)
See franz-go documentation for available options: https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo
Index ¶
- func GroupIDAttr(groupID string) slog.Attr
- func OffsetAttr(offset int64) slog.Attr
- func PartitionAttr(partition int32) slog.Attr
- func TopicAttr(topic string) slog.Attr
- type Header
- type Message
- type Option
- func AtLeastOnce(topic string, processor queue.Processor[Message]) Option
- func AtMostOnce(topic string, processor queue.Processor[Message]) Option
- func FetchMaxBytes(bytes int32) Option
- func MaxConcurrentFetches(fetches int) Option
- func RebalanceTimeout(d time.Duration) Option
- func SessionTimeout(d time.Duration) Option
- func WithTLS(cfg *tls.Config) Option
- type Options
- type Runtime
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GroupIDAttr ¶
GroupIDAttr returns a slog attribute for the Kafka consumer group ID.
func OffsetAttr ¶
OffsetAttr returns a slog attribute for the Kafka offset.
func PartitionAttr ¶
PartitionAttr returns a slog attribute for the Kafka partition.
Types ¶
type Message ¶
type Message struct {
Key []byte
Value []byte
Headers []Header
Timestamp time.Time
Topic string
Partition int32
Offset int64
Attrs uint8
}
Message represents a Kafka message.
type Option ¶
type Option func(*Options)
Option defines a function type for configuring Kafka runtime options.
func AtLeastOnce ¶
AtLeastOnce configures the Kafka runtime to process messages from the specified topic with at-least-once delivery semantics (process before acknowledge).
func AtMostOnce ¶
AtMostOnce configures the Kafka runtime to process messages from the specified topic using at-most-once delivery semantics.
With at-most-once semantics, messages are committed to Kafka before processing begins. This means that if processing fails, the message is lost since it has already been committed and will not be redelivered.
This approach provides:
- Lower latency (commits happen immediately)
- Higher throughput (no waiting for processing to complete)
- Risk of message loss on processing failures
Use at-most-once when:
- Message loss is acceptable
- Performance is critical
- Data is non-critical (metrics, logs, cache updates)
The processor will receive messages even if commit fails, but the consumer group offset will not advance, potentially causing duplicate processing on restart.
func FetchMaxBytes ¶
FetchMaxBytes sets the maximum total bytes to buffer from fetch responses across all partitions. Default is 50 MB if not set.
func MaxConcurrentFetches ¶
MaxConcurrentFetches sets the maximum number of concurrent fetch requests. Default is unlimited if not set.
func RebalanceTimeout ¶
RebalanceTimeout sets the rebalance timeout for the Kafka consumer group.
func SessionTimeout ¶
SessionTimeout sets the session timeout for the Kafka consumer group.
func WithTLS ¶ added in v0.14.0
WithTLS configures TLS/mTLS for secure connections to Kafka brokers. Pass a fully configured *tls.Config with certificates, CA pool, and other TLS settings.
Example:
// Load certificates
cert, err := tls.LoadX509KeyPair("client-cert.pem", "client-key.pem")
if err != nil {
return err
}
caCert, err := os.ReadFile("ca-cert.pem")
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
runtime := kafka.NewRuntime(brokers, groupID,
kafka.WithTLS(tlsConfig),
kafka.AtLeastOnce(topic, processor),
)
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options represents configuration options for the Kafka runtime.
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime represents the Kafka runtime for processing messages.
func NewRuntime ¶
NewRuntime creates a new Kafka runtime with the provided brokers, group ID, and options.