streaming

command
v0.0.0-...-7871f83 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

README

Example: Streaming

Overview

Demonstrates real-time streaming execution in AgentMesh. Shows how to get live updates during graph execution and stream LLM responses token-by-token for better user experience.

Key Concepts

  • Stream: Real-time event channel for graph execution
  • StreamChunk: Token-level updates from LLM model streaming
  • Event Types: NodeStart, NodeComplete, NodeError, GraphComplete
  • Proper Cleanup: Always call stream.Close() or stream.Cancel()
  • Backpressure: Event streams use a fast-path channel with a bounded 100 ms timeout, so listeners get low-latency updates without spawning thousands of timers when paused.

Prerequisites

export OPENAI_API_KEY="sk-..."

Running

cd examples/streaming
go run main.go

Expected Output

=== Graph Streaming Example ===

[Event] NodeStart: analyze
  Superstep: 1
  Timestamp: 10:30:15.123

[Streaming] analyze → "Based"
[Streaming] analyze → " on"
[Streaming] analyze → " the"
[Streaming] analyze → " data"
[Streaming] analyze → ","
[Streaming] analyze → " the"
[Streaming] analyze → " trend"
[Streaming] analyze → " is"
[Streaming] analyze → " positive"
[Streaming] analyze → "."

[Event] NodeComplete: analyze
  Duration: 1.234s
  Output: "Based on the data, the trend is positive."

[Event] NodeStart: summarize
  Superstep: 2

[Streaming] summarize → "Summary"
[Streaming] summarize → ":"
[Streaming] summarize → " Positive"
[Streaming] summarize → " trend"

[Event] NodeComplete: summarize
  Duration: 0.567s

[Event] GraphComplete
  Total Duration: 1.801s
  Supersteps: 2
  ✓ Execution successful

Code Walkthrough

1. Build Graph with Streaming Model
model := openai.NewModel(client, openai.WithStreaming(true))
2. Start Streaming Execution
seq := compiled.Run(ctx, messages)
3. Process Events
for event, err := range seq {
    if err != nil {
        // handle error
    }
    // process event
        fmt.Printf("→ Starting: %s\n", e.NodeName)
    
    case *graph.NodeCompleteEvent:
        fmt.Printf("✓ Completed: %s (%.2fs)\n", 
            e.NodeName, e.Duration.Seconds())
    
    case *graph.NodeErrorEvent:
        fmt.Printf("❌ Error in %s: %v\n", e.NodeName, e.Error)
    
    case *graph.GraphCompleteEvent:
        fmt.Printf("Graph completed in %.2fs\n", e.Duration.Seconds())
    }
}
4. Handle Token Streaming
case *graph.NodeCompleteEvent:
    if e.StreamChunks != nil {
        for chunk := range e.StreamChunks {
            fmt.Printf("[Token] %s", chunk.Text)
        }
        fmt.Println()
    }

Event Types

NodeStartEvent
type NodeStartEvent struct {
    NodeName   string
    Superstep  int
    Timestamp  time.Time
}
NodeCompleteEvent
type NodeCompleteEvent struct {
    NodeName     string
    Duration     time.Duration
    Updates      map[string]any
    StreamChunks <-chan model.StreamChunk  // Token stream
}
NodeErrorEvent
type NodeErrorEvent struct {
    NodeName string
    Error    error
    Retrying bool
}
GraphCompleteEvent
type GraphCompleteEvent struct {
    Duration   time.Duration
    Supersteps int
    FinalState map[string]any
}

Proper Cleanup

Context Cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

seq := compiled.Run(ctx, messages)

What This Example Teaches

  • ✅ Real-time execution monitoring
  • ✅ Token-by-token LLM streaming
  • ✅ Event-driven progress tracking
  • ✅ Proper resource cleanup
  • ✅ User experience optimization

Use Cases

Live UI Updates
for event := range stream.Events() {
    updateProgressBar(event)
    showTokensInRealTime(event)
}
Execution Monitoring
for event := range stream.Events() {
    logToMetrics(event)
    checkTimeout(event)
}
Early Termination
for event := range stream.Events() {
    if shouldStop(event) {
        stream.Cancel()
        break
    }
}

Next Steps

  • Implement real-time UI updates
  • Add streaming to chat applications
  • Combine with checkpointing for long workflows
  • See examples/observability for metrics integration

See Also

Documentation

Overview

Package main demonstrates real-time streaming execution in AgentMesh.

This example shows how to:

  • Stream LLM responses in real-time using the iterator pattern
  • Receive partial AI messages as they're generated
  • Handle both partial and complete messages cleanly

The key insight: partial model responses are streamed via the Run() iterator, so you don't need an event bus for basic streaming use cases.

Prerequisites:

export OPENAI_API_KEY="sk-..."

Run: go run main.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL