transport

package
v0.0.0-...-4bb3392 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConnectionClosed = errors.New("connection closed")

ErrConnectionClosed is returned when sending a message to a connection that is closed or in the process of closing.

Functions

func AddMiddleware

func AddMiddleware[S Session](handler *MethodHandler[S], middleware []Middleware[S])

AddMiddleware wraps the handler in the middleware functions.

func Connect

func Connect[H Handler](ctx context.Context, t Transport, b Binder[H]) (h H, err error)

func DefaultReceivingMethodHandler

func DefaultReceivingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)

DefaultReceivingMethodHandler is the initial MethodHandler for servers and clients, before being wrapped by middleware.

func DefaultSendingMethodHandler

func DefaultSendingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)

DefaultSendingMethodHandler is the initial MethodHandler for servers and clients, before being wrapped by middleware.

func IsConnectionClosedError

func IsConnectionClosedError(err error) bool

IsConnectionClosedError reports whether err indicates a connection has been closed. It checks for various error types and patterns that indicate connection closure.

func NewInMemoryTransports

func NewInMemoryTransports() (Transport, Transport)

NewInMemoryTransports returns two InMemoryTransports that connect to each other.

Types

type Binder

type Binder[T Handler] interface {
	Bind(*jsonrpc2.Connection) T
	Disconnect(T)
}

type Client

type Client interface {
	ClientSendingMethodHandler() any
	ClientReceivingMethodHandler() any
}

type ClientSession

type ClientSession struct {
	Connection   *jsonrpc2.Connection
	Client       Client
	A2AConn      Connection
	HTTPClient   *http.Client
	Interceptors []Interceptor
	// contains filtered or unexported fields
}

ClientSession is a logical connection with an A2A server. Its methods can be used to send requests or notifications to the server. Create a session by calling [Client.Connect].

Call ClientSession.Close to close the connection, or await client termination with ServerSession.Wait.

func (*ClientSession) AuthenticatedExtendedCard

func (cs *ClientSession) AuthenticatedExtendedCard(ctx context.Context, req *a2a.EmptyParams) (*a2a.AgentCard, error)

AuthenticatedExtendedCard retrieves a potentially more detailed version of the a2a.AgentCard after the client has authenticated.

func (*ClientSession) CancelTask

func (cs *ClientSession) CancelTask(ctx context.Context, req *a2a.TaskIDParams) (*a2a.Task, error)

CancelTask requests the cancellation of an ongoing task.

func (*ClientSession) Close

func (cs *ClientSession) Close() error

Close performs a graceful close of the connection, preventing new requests from being handled, and waiting for ongoing requests to return. Close then terminates the connection.

Close implements Session.

func (*ClientSession) Conn

func (cs *ClientSession) Conn() *jsonrpc2.Connection

Conn implements Session.

func (*ClientSession) DeleteTasksPushNotificationConfig

func (cs *ClientSession) DeleteTasksPushNotificationConfig(ctx context.Context, req *a2a.DeleteTaskPushNotificationConfigParams) (*a2a.EmptyResult, error)

DeleteTasksPushNotificationConfig deletes an associated push notification configuration for a task.

func (*ClientSession) GetAgentCard

func (cs *ClientSession) GetAgentCard(ctx context.Context, baseURL string) (*a2a.AgentCard, error)

GetAgentCard

func (*ClientSession) GetTask

func (cs *ClientSession) GetTask(ctx context.Context, req *a2a.TaskQueryParams) (*a2a.Task, error)

GetTask retrieves the current state (including status, artifacts, and optionally history) of a previously initiated task.

func (*ClientSession) GetTasksPushNotificationConfig

func (cs *ClientSession) GetTasksPushNotificationConfig(ctx context.Context, req *a2a.GetTaskPushNotificationConfigParams) (*a2a.TaskPushNotificationConfig, error)

GetTasksPushNotificationConfig retrieves the current push notification configuration for a specified task.

func (*ClientSession) Handle

func (cs *ClientSession) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)

Handle implements Handler.

func (*ClientSession) ID

func (cs *ClientSession) ID() string

ID implements Session.

func (*ClientSession) ListTasksPushNotificationConfig

ListTasksPushNotificationConfig retrieves the associated push notification configurations for a specified task.

func (*ClientSession) ResubscribeTasks

ResubscribeTasks allows a client to reconnect to an SSE stream for an ongoing task after a previous connection (from message/stream or an earlier tasks/resubscribe) was interrupted.

func (*ClientSession) SendMessage

func (cs *ClientSession) SendMessage(ctx context.Context, req *a2a.MessageSendParams) (a2a.MessageOrTask, error)

SendMessage sends a message to an agent to initiate a new interaction or to continue an existing one.

func (*ClientSession) SendStreamMessage

SendStreamMessage sends a message to an agent to initiate/continue a task AND subscribes the client to real-time updates for that task via Server-Sent Events (SSE).

func (*ClientSession) SetConn

func (cs *ClientSession) SetConn(c Connection)

SetConn implements Handler.

func (*ClientSession) SetTasksPushNotificationConfig

func (cs *ClientSession) SetTasksPushNotificationConfig(ctx context.Context, req *a2a.TaskPushNotificationConfig) (*a2a.TaskPushNotificationConfig, error)

SetTasksPushNotificationConfig sets or updates the push notification configuration for a specified task.

func (*ClientSession) Wait

func (cs *ClientSession) Wait() error

Wait waits for the connection to be closed by the server. Generally, clients should be responsible for closing the connection.

Wait implements Session.

type Connection

type Connection interface {
	SessionID() string
	Read(context.Context) (jsonrpc2.Message, error)
	Write(context.Context, jsonrpc2.Message) error
	Close() error // may be called concurrently by both peers
}

Connection is a logical bidirectional JSON-RPC connection.

type Event

type Event struct {
	Name string // the "event" field
	ID   string // the "id" field
	Data []byte // the "data" field
}

Event is a server-sent event. See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields.

func (Event) Empty

func (e Event) Empty() bool

Empty reports whether the Event is empty.

type GetServer

type GetServer func(request *http.Request) Server

GetServer represents a function that returns a Server for a given *http.Request.

type Handler

type Handler interface {
	Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
	SetConn(Connection)
}

type InMemoryTransport

type InMemoryTransport struct {
	// contains filtered or unexported fields
}

InMemoryTransport is a Transport that communicates over an in-memory network connection, using newline-delimited JSON.

func (*InMemoryTransport) Connect

func (t *InMemoryTransport) Connect(context.Context) (Connection, error)

Connect implements Transport.

type Interceptor

type Interceptor func(ctx context.Context, req *http.Request, invoker Invoker) (*http.Response, error)

Interceptor defines a middleware function that can intercept and modify requests/responses.

type Invoker

type Invoker func(ctx context.Context, req *http.Request) (*http.Response, error)

Invoker represents the next handler in the interceptor chain.

type LoggingTransport

type LoggingTransport struct {
	// contains filtered or unexported fields
}

LoggingTransport is a Transport that delegates to another transport, writing RPC logs to an io.Writer.

func (*LoggingTransport) Connect

func (t *LoggingTransport) Connect(ctx context.Context) (Connection, error)

Connect connects the underlying transport, returning a Connection that writes logs to the configured destination.

Connect implements Transport.

type MethodHandler

type MethodHandler[S Session] func(ctx context.Context, session S, method a2a.Method, params a2a.Params) (result a2a.Result, err error)

MethodHandler handles A2A messages. For methods, exactly one of the return values must be nil. For notifications, both must be nil.

type Middleware

type Middleware[S Session] func(MethodHandler[S]) MethodHandler[S]

Middleware is a function from MethodHandler to MethodHandler.

type SSEClientTransport

type SSEClientTransport struct {
	// contains filtered or unexported fields
}

SSEClientTransport is a Transport that can communicate with an MCP endpoint serving the SSE transport defined by the 2024-11-05 version of the spec.

https://modelcontextprotocol.io/specification/2024-11-05/basic/transports

func NewSSEClientTransport

func NewSSEClientTransport(baseURL string, opts *SSEClientTransportOptions) *SSEClientTransport

NewSSEClientTransport returns a new client transport that connects to the SSE server at the provided URL.

NewSSEClientTransport panics if the given URL is invalid.

func (*SSEClientTransport) Connect

func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error)

Connect connects through the client endpoint.

type SSEClientTransportOptions

type SSEClientTransportOptions struct {
	// HTTPClient is the client to use for making HTTP requests. If nil,
	// http.DefaultClient is used.
	HTTPClient *http.Client
}

SSEClientTransportOptions provides options for the NewSSEClientTransport constructor.

type SSEHandler

type SSEHandler struct {
	// contains filtered or unexported fields
}

SSEHandler is an http.Handler that serves SSE-based A2A sessions.

func NewSSEHandler

func NewSSEHandler(getServer func(request *http.Request) Server, opts *SSEHandlerOptions) *SSEHandler

NewSSEHandler returns a new SSEHandler that creates and manages A2A sessions created via incoming HTTP requests.

Sessions are created when the client issues a GET request to the server, which must accept text/event-stream responses (server-sent events). For each such request, a new SSEServerTransport is created with a distinct messages endpoint, and connected to the server returned by getServer. The SSEHandler also handles requests to the message endpoints, by delegating them to the relevant server transport.

The getServer function may return a distinct Server for each new request, or reuse an existing server. If it returns nil, the handler will return a 400 Bad Request.

func (*SSEHandler) ServeHTTP

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type SSEHandlerOptions

type SSEHandlerOptions struct{}

SSEHandlerOptions provides options for the NewSSEHandler constructor.

type SSEServerTransport

type SSEServerTransport struct {
	// contains filtered or unexported fields
}

SSEServerTransport is a logical SSE session created through a hanging GET request.

When connected, it returns the following Connection implementation:

  • Writes are SSE 'message' events to the GET response.
  • Reads are received from POSTs to the session endpoint, via SSEServerTransport.ServeHTTP.
  • Close terminates the hanging GET.

func NewSSEServerTransport

func NewSSEServerTransport(endpoint string, w http.ResponseWriter, sessionID string) *SSEServerTransport

NewSSEServerTransport creates a new SSE transport for the given messages endpoint, and hanging GET response.

Use SSEServerTransport.Connect to initiate the flow of messages.

The transport is itself an http.Handler. It is the caller's responsibility to ensure that the resulting transport serves HTTP requests on the given session endpoint.

Most callers should instead use an SSEHandler, which transparently handles the delegation to SSEServerTransports.

func (*SSEServerTransport) Connect

Connect sends the 'endpoint' event to the client. See SSEServerTransport for more details on the Connection implementation.

func (*SSEServerTransport) ServeHTTP

func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles POST requests to the transport endpoint.

ServeHTTP implements http.Handler.

type Server

type Server interface {
	Connect(ctx context.Context, t Transport) (*ServerSession, error)
	AgentCard() *a2a.AgentCard
	ServerSendingMethodHandler() any
	ServerReceivingMethodHandler() any

	SendMessage(ctx context.Context, ss *ServerSession, params *a2a.MessageSendParams) (a2a.MessageOrTask, error)
	SendStreamMessage(ctx context.Context, ss *ServerSession, params *a2a.MessageSendParams) (a2a.SendStreamingMessageResponse, error)
	GetTask(ctx context.Context, ss *ServerSession, params *a2a.TaskQueryParams) (*a2a.Task, error)
	CancelTask(ctx context.Context, ss *ServerSession, params *a2a.TaskIDParams) (*a2a.Task, error)
	SetTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.TaskPushNotificationConfig) (*a2a.TaskPushNotificationConfig, error)
	GetTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.GetTaskPushNotificationConfigParams) (*a2a.TaskPushNotificationConfig, error)
	ListTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.ListTaskPushNotificationConfigParams) (a2a.TaskPushNotificationConfigs, error)
	DeleteTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.DeleteTaskPushNotificationConfigParams) (*a2a.EmptyResult, error)
	ResubscribeTasks(ctx context.Context, ss *ServerSession, params *a2a.TaskIDParams) (a2a.SendStreamingMessageResponse, error)
	AuthenticatedExtendedCard(ctx context.Context, ss *ServerSession, params *a2a.EmptyParams) (*a2a.AgentCard, error)
}

Server provides methods for handling incoming requests from A2A clients.

type ServerHandler

type ServerHandler struct {
	// contains filtered or unexported fields
}

ServerHandler represents an HTTP handler that serves A2A sessions.

func NewServerHandler

func NewServerHandler(getServer GetServer, opts *SSEHandlerOptions) *ServerHandler

NewServerHandler returns a new ServerHandler that serves A2A sessions.

func (*ServerHandler) ServeHTTP

func (h *ServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type ServerSession

type ServerSession struct {
	Server     Server
	Connection *jsonrpc2.Connection
	A2aConn    Connection
	Mu         sync.Mutex
}

A ServerSession is a logical connection from a single A2A client. Its methods can be used to send requests or notifications to the client. Create a session by calling [Server.Connect].

Call ServerSession.Close to close the connection, or await client termination with ServerSession.Wait.

func (*ServerSession) Close

func (ss *ServerSession) Close() error

Close performs a graceful shutdown of the connection, preventing new requests from being handled, and waiting for ongoing requests to return. Close then terminates the connection.

Close implements Session.

func (*ServerSession) Conn

func (ss *ServerSession) Conn() *jsonrpc2.Connection

Conn implements Session.

func (*ServerSession) Handle

func (ss *ServerSession) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)

handle invokes the method described by the given JSON RPC request.

Handle implements Handler.

func (*ServerSession) ID

func (ss *ServerSession) ID() string

ID implements Session.

func (*ServerSession) SetConn

func (ss *ServerSession) SetConn(c Connection)

SetConn implements Handler.

func (*ServerSession) Wait

func (ss *ServerSession) Wait() error

Wait waits for the connection to be closed by the client.

Wait implements Session.

type Session

type Session interface {
	*ClientSession | *ServerSession

	// ID returns the session ID, or the empty string if there is none.
	ID() string

	Conn() *jsonrpc2.Connection
	Close() error
	Wait() error
	// contains filtered or unexported methods
}

Session is either a ClientSession or a ServerSession.

type Transport

type Transport interface {
	// Connect returns the logical JSON-RPC connection..
	//
	// It is called exactly once by [Server.Connect] or [Client.Connect].
	Connect(ctx context.Context) (Connection, error)
}

Transport is used to create a bidirectional connection between A2A client and server.

Transports should be used for at most one call to [Server.Connect] or [Client.Connect].

func NewLoggingTransport

func NewLoggingTransport(delegate Transport, w io.Writer) Transport

NewLoggingTransport creates a new LoggingTransport that delegates to the provided transport, writing RPC logs to the provided io.Writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL