Documentation
¶
Index ¶
- Variables
- func AddMiddleware[S Session](handler *MethodHandler[S], middleware []Middleware[S])
- func Connect[H Handler](ctx context.Context, t Transport, b Binder[H]) (h H, err error)
- func DefaultReceivingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)
- func DefaultSendingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)
- func IsConnectionClosedError(err error) bool
- func NewInMemoryTransports() (Transport, Transport)
- type Binder
- type Client
- type ClientSession
- func (cs *ClientSession) AuthenticatedExtendedCard(ctx context.Context, req *a2a.EmptyParams) (*a2a.AgentCard, error)
- func (cs *ClientSession) CancelTask(ctx context.Context, req *a2a.TaskIDParams) (*a2a.Task, error)
- func (cs *ClientSession) Close() error
- func (cs *ClientSession) Conn() *jsonrpc2.Connection
- func (cs *ClientSession) DeleteTasksPushNotificationConfig(ctx context.Context, req *a2a.DeleteTaskPushNotificationConfigParams) (*a2a.EmptyResult, error)
- func (cs *ClientSession) GetAgentCard(ctx context.Context, baseURL string) (*a2a.AgentCard, error)
- func (cs *ClientSession) GetTask(ctx context.Context, req *a2a.TaskQueryParams) (*a2a.Task, error)
- func (cs *ClientSession) GetTasksPushNotificationConfig(ctx context.Context, req *a2a.GetTaskPushNotificationConfigParams) (*a2a.TaskPushNotificationConfig, error)
- func (cs *ClientSession) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
- func (cs *ClientSession) ID() string
- func (cs *ClientSession) ListTasksPushNotificationConfig(ctx context.Context, req *a2a.ListTaskPushNotificationConfigParams) (a2a.TaskPushNotificationConfigs, error)
- func (cs *ClientSession) ResubscribeTasks(ctx context.Context, req *a2a.TaskIDParams) (a2a.SendStreamingMessageResponse, error)
- func (cs *ClientSession) SendMessage(ctx context.Context, req *a2a.MessageSendParams) (a2a.MessageOrTask, error)
- func (cs *ClientSession) SendStreamMessage(ctx context.Context, req *a2a.MessageSendParams) (a2a.SendStreamingMessageResponse, error)
- func (cs *ClientSession) SetConn(c Connection)
- func (cs *ClientSession) SetTasksPushNotificationConfig(ctx context.Context, req *a2a.TaskPushNotificationConfig) (*a2a.TaskPushNotificationConfig, error)
- func (cs *ClientSession) Wait() error
- type Connection
- type Event
- type GetServer
- type Handler
- type InMemoryTransport
- type Interceptor
- type Invoker
- type LoggingTransport
- type MethodHandler
- type Middleware
- type SSEClientTransport
- type SSEClientTransportOptions
- type SSEHandler
- type SSEHandlerOptions
- type SSEServerTransport
- type Server
- type ServerHandler
- type ServerSession
- func (ss *ServerSession) Close() error
- func (ss *ServerSession) Conn() *jsonrpc2.Connection
- func (ss *ServerSession) Handle(ctx context.Context, req *jsonrpc2.Request) (any, error)
- func (ss *ServerSession) ID() string
- func (ss *ServerSession) SetConn(c Connection)
- func (ss *ServerSession) Wait() error
- type Session
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ErrConnectionClosed = errors.New("connection closed")
ErrConnectionClosed is returned when sending a message to a connection that is closed or in the process of closing.
Functions ¶
func AddMiddleware ¶
func AddMiddleware[S Session](handler *MethodHandler[S], middleware []Middleware[S])
AddMiddleware wraps the handler in the middleware functions.
func DefaultReceivingMethodHandler ¶
func DefaultReceivingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)
DefaultReceivingMethodHandler is the initial MethodHandler for servers and clients, before being wrapped by middleware.
func DefaultSendingMethodHandler ¶
func DefaultSendingMethodHandler[S Session](ctx context.Context, session S, method a2a.Method, params a2a.Params) (a2a.Result, error)
DefaultSendingMethodHandler is the initial MethodHandler for servers and clients, before being wrapped by middleware.
func IsConnectionClosedError ¶
IsConnectionClosedError reports whether err indicates a connection has been closed. It checks for various error types and patterns that indicate connection closure.
func NewInMemoryTransports ¶
NewInMemoryTransports returns two InMemoryTransports that connect to each other.
Types ¶
type Binder ¶
type Binder[T Handler] interface { Bind(*jsonrpc2.Connection) T Disconnect(T) }
type ClientSession ¶
type ClientSession struct {
Connection *jsonrpc2.Connection
Client Client
A2AConn Connection
HTTPClient *http.Client
Interceptors []Interceptor
// contains filtered or unexported fields
}
ClientSession is a logical connection with an A2A server. Its methods can be used to send requests or notifications to the server. Create a session by calling [Client.Connect].
Call ClientSession.Close to close the connection, or await client termination with ServerSession.Wait.
func (*ClientSession) AuthenticatedExtendedCard ¶
func (cs *ClientSession) AuthenticatedExtendedCard(ctx context.Context, req *a2a.EmptyParams) (*a2a.AgentCard, error)
AuthenticatedExtendedCard retrieves a potentially more detailed version of the a2a.AgentCard after the client has authenticated.
func (*ClientSession) CancelTask ¶
func (cs *ClientSession) CancelTask(ctx context.Context, req *a2a.TaskIDParams) (*a2a.Task, error)
CancelTask requests the cancellation of an ongoing task.
func (*ClientSession) Close ¶
func (cs *ClientSession) Close() error
Close performs a graceful close of the connection, preventing new requests from being handled, and waiting for ongoing requests to return. Close then terminates the connection.
Close implements Session.
func (*ClientSession) Conn ¶
func (cs *ClientSession) Conn() *jsonrpc2.Connection
Conn implements Session.
func (*ClientSession) DeleteTasksPushNotificationConfig ¶
func (cs *ClientSession) DeleteTasksPushNotificationConfig(ctx context.Context, req *a2a.DeleteTaskPushNotificationConfigParams) (*a2a.EmptyResult, error)
DeleteTasksPushNotificationConfig deletes an associated push notification configuration for a task.
func (*ClientSession) GetAgentCard ¶
GetAgentCard
func (*ClientSession) GetTask ¶
func (cs *ClientSession) GetTask(ctx context.Context, req *a2a.TaskQueryParams) (*a2a.Task, error)
GetTask retrieves the current state (including status, artifacts, and optionally history) of a previously initiated task.
func (*ClientSession) GetTasksPushNotificationConfig ¶
func (cs *ClientSession) GetTasksPushNotificationConfig(ctx context.Context, req *a2a.GetTaskPushNotificationConfigParams) (*a2a.TaskPushNotificationConfig, error)
GetTasksPushNotificationConfig retrieves the current push notification configuration for a specified task.
func (*ClientSession) ListTasksPushNotificationConfig ¶
func (cs *ClientSession) ListTasksPushNotificationConfig(ctx context.Context, req *a2a.ListTaskPushNotificationConfigParams) (a2a.TaskPushNotificationConfigs, error)
ListTasksPushNotificationConfig retrieves the associated push notification configurations for a specified task.
func (*ClientSession) ResubscribeTasks ¶
func (cs *ClientSession) ResubscribeTasks(ctx context.Context, req *a2a.TaskIDParams) (a2a.SendStreamingMessageResponse, error)
ResubscribeTasks allows a client to reconnect to an SSE stream for an ongoing task after a previous connection (from message/stream or an earlier tasks/resubscribe) was interrupted.
func (*ClientSession) SendMessage ¶
func (cs *ClientSession) SendMessage(ctx context.Context, req *a2a.MessageSendParams) (a2a.MessageOrTask, error)
SendMessage sends a message to an agent to initiate a new interaction or to continue an existing one.
func (*ClientSession) SendStreamMessage ¶
func (cs *ClientSession) SendStreamMessage(ctx context.Context, req *a2a.MessageSendParams) (a2a.SendStreamingMessageResponse, error)
SendStreamMessage sends a message to an agent to initiate/continue a task AND subscribes the client to real-time updates for that task via Server-Sent Events (SSE).
func (*ClientSession) SetConn ¶
func (cs *ClientSession) SetConn(c Connection)
SetConn implements Handler.
func (*ClientSession) SetTasksPushNotificationConfig ¶
func (cs *ClientSession) SetTasksPushNotificationConfig(ctx context.Context, req *a2a.TaskPushNotificationConfig) (*a2a.TaskPushNotificationConfig, error)
SetTasksPushNotificationConfig sets or updates the push notification configuration for a specified task.
func (*ClientSession) Wait ¶
func (cs *ClientSession) Wait() error
Wait waits for the connection to be closed by the server. Generally, clients should be responsible for closing the connection.
Wait implements Session.
type Connection ¶
type Connection interface {
SessionID() string
Read(context.Context) (jsonrpc2.Message, error)
Write(context.Context, jsonrpc2.Message) error
Close() error // may be called concurrently by both peers
}
Connection is a logical bidirectional JSON-RPC connection.
type Event ¶
type Event struct {
Name string // the "event" field
ID string // the "id" field
Data []byte // the "data" field
}
Event is a server-sent event. See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields.
type GetServer ¶
GetServer represents a function that returns a Server for a given *http.Request.
type InMemoryTransport ¶
type InMemoryTransport struct {
// contains filtered or unexported fields
}
InMemoryTransport is a Transport that communicates over an in-memory network connection, using newline-delimited JSON.
type Interceptor ¶
type Interceptor func(ctx context.Context, req *http.Request, invoker Invoker) (*http.Response, error)
Interceptor defines a middleware function that can intercept and modify requests/responses.
type LoggingTransport ¶
type LoggingTransport struct {
// contains filtered or unexported fields
}
LoggingTransport is a Transport that delegates to another transport, writing RPC logs to an io.Writer.
func (*LoggingTransport) Connect ¶
func (t *LoggingTransport) Connect(ctx context.Context) (Connection, error)
Connect connects the underlying transport, returning a Connection that writes logs to the configured destination.
Connect implements Transport.
type MethodHandler ¶
type MethodHandler[S Session] func(ctx context.Context, session S, method a2a.Method, params a2a.Params) (result a2a.Result, err error)
MethodHandler handles A2A messages. For methods, exactly one of the return values must be nil. For notifications, both must be nil.
type Middleware ¶
type Middleware[S Session] func(MethodHandler[S]) MethodHandler[S]
Middleware is a function from MethodHandler to MethodHandler.
type SSEClientTransport ¶
type SSEClientTransport struct {
// contains filtered or unexported fields
}
SSEClientTransport is a Transport that can communicate with an MCP endpoint serving the SSE transport defined by the 2024-11-05 version of the spec.
https://modelcontextprotocol.io/specification/2024-11-05/basic/transports
func NewSSEClientTransport ¶
func NewSSEClientTransport(baseURL string, opts *SSEClientTransportOptions) *SSEClientTransport
NewSSEClientTransport returns a new client transport that connects to the SSE server at the provided URL.
NewSSEClientTransport panics if the given URL is invalid.
func (*SSEClientTransport) Connect ¶
func (c *SSEClientTransport) Connect(ctx context.Context) (Connection, error)
Connect connects through the client endpoint.
type SSEClientTransportOptions ¶
type SSEClientTransportOptions struct {
// HTTPClient is the client to use for making HTTP requests. If nil,
// http.DefaultClient is used.
HTTPClient *http.Client
}
SSEClientTransportOptions provides options for the NewSSEClientTransport constructor.
type SSEHandler ¶
type SSEHandler struct {
// contains filtered or unexported fields
}
SSEHandler is an http.Handler that serves SSE-based A2A sessions.
func NewSSEHandler ¶
func NewSSEHandler(getServer func(request *http.Request) Server, opts *SSEHandlerOptions) *SSEHandler
NewSSEHandler returns a new SSEHandler that creates and manages A2A sessions created via incoming HTTP requests.
Sessions are created when the client issues a GET request to the server, which must accept text/event-stream responses (server-sent events). For each such request, a new SSEServerTransport is created with a distinct messages endpoint, and connected to the server returned by getServer. The SSEHandler also handles requests to the message endpoints, by delegating them to the relevant server transport.
The getServer function may return a distinct Server for each new request, or reuse an existing server. If it returns nil, the handler will return a 400 Bad Request.
func (*SSEHandler) ServeHTTP ¶
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
type SSEHandlerOptions ¶
type SSEHandlerOptions struct{}
SSEHandlerOptions provides options for the NewSSEHandler constructor.
type SSEServerTransport ¶
type SSEServerTransport struct {
// contains filtered or unexported fields
}
SSEServerTransport is a logical SSE session created through a hanging GET request.
When connected, it returns the following Connection implementation:
- Writes are SSE 'message' events to the GET response.
- Reads are received from POSTs to the session endpoint, via SSEServerTransport.ServeHTTP.
- Close terminates the hanging GET.
func NewSSEServerTransport ¶
func NewSSEServerTransport(endpoint string, w http.ResponseWriter, sessionID string) *SSEServerTransport
NewSSEServerTransport creates a new SSE transport for the given messages endpoint, and hanging GET response.
Use SSEServerTransport.Connect to initiate the flow of messages.
The transport is itself an http.Handler. It is the caller's responsibility to ensure that the resulting transport serves HTTP requests on the given session endpoint.
Most callers should instead use an SSEHandler, which transparently handles the delegation to SSEServerTransports.
func (*SSEServerTransport) Connect ¶
func (t *SSEServerTransport) Connect(context.Context) (Connection, error)
Connect sends the 'endpoint' event to the client. See SSEServerTransport for more details on the Connection implementation.
func (*SSEServerTransport) ServeHTTP ¶
func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP handles POST requests to the transport endpoint.
ServeHTTP implements http.Handler.
type Server ¶
type Server interface {
Connect(ctx context.Context, t Transport) (*ServerSession, error)
AgentCard() *a2a.AgentCard
ServerSendingMethodHandler() any
ServerReceivingMethodHandler() any
SendMessage(ctx context.Context, ss *ServerSession, params *a2a.MessageSendParams) (a2a.MessageOrTask, error)
SendStreamMessage(ctx context.Context, ss *ServerSession, params *a2a.MessageSendParams) (a2a.SendStreamingMessageResponse, error)
GetTask(ctx context.Context, ss *ServerSession, params *a2a.TaskQueryParams) (*a2a.Task, error)
CancelTask(ctx context.Context, ss *ServerSession, params *a2a.TaskIDParams) (*a2a.Task, error)
SetTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.TaskPushNotificationConfig) (*a2a.TaskPushNotificationConfig, error)
GetTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.GetTaskPushNotificationConfigParams) (*a2a.TaskPushNotificationConfig, error)
ListTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.ListTaskPushNotificationConfigParams) (a2a.TaskPushNotificationConfigs, error)
DeleteTasksPushNotificationConfig(ctx context.Context, ss *ServerSession, params *a2a.DeleteTaskPushNotificationConfigParams) (*a2a.EmptyResult, error)
ResubscribeTasks(ctx context.Context, ss *ServerSession, params *a2a.TaskIDParams) (a2a.SendStreamingMessageResponse, error)
AuthenticatedExtendedCard(ctx context.Context, ss *ServerSession, params *a2a.EmptyParams) (*a2a.AgentCard, error)
}
Server provides methods for handling incoming requests from A2A clients.
type ServerHandler ¶
type ServerHandler struct {
// contains filtered or unexported fields
}
ServerHandler represents an HTTP handler that serves A2A sessions.
func NewServerHandler ¶
func NewServerHandler(getServer GetServer, opts *SSEHandlerOptions) *ServerHandler
NewServerHandler returns a new ServerHandler that serves A2A sessions.
func (*ServerHandler) ServeHTTP ¶
func (h *ServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
type ServerSession ¶
type ServerSession struct {
Server Server
Connection *jsonrpc2.Connection
A2aConn Connection
Mu sync.Mutex
}
A ServerSession is a logical connection from a single A2A client. Its methods can be used to send requests or notifications to the client. Create a session by calling [Server.Connect].
Call ServerSession.Close to close the connection, or await client termination with ServerSession.Wait.
func (*ServerSession) Close ¶
func (ss *ServerSession) Close() error
Close performs a graceful shutdown of the connection, preventing new requests from being handled, and waiting for ongoing requests to return. Close then terminates the connection.
Close implements Session.
func (*ServerSession) Conn ¶
func (ss *ServerSession) Conn() *jsonrpc2.Connection
Conn implements Session.
func (*ServerSession) Handle ¶
handle invokes the method described by the given JSON RPC request.
Handle implements Handler.
func (*ServerSession) SetConn ¶
func (ss *ServerSession) SetConn(c Connection)
SetConn implements Handler.
func (*ServerSession) Wait ¶
func (ss *ServerSession) Wait() error
Wait waits for the connection to be closed by the client.
Wait implements Session.
type Session ¶
type Session interface {
*ClientSession | *ServerSession
// ID returns the session ID, or the empty string if there is none.
ID() string
Conn() *jsonrpc2.Connection
Close() error
Wait() error
// contains filtered or unexported methods
}
Session is either a ClientSession or a ServerSession.
type Transport ¶
type Transport interface {
// Connect returns the logical JSON-RPC connection..
//
// It is called exactly once by [Server.Connect] or [Client.Connect].
Connect(ctx context.Context) (Connection, error)
}
Transport is used to create a bidirectional connection between A2A client and server.
Transports should be used for at most one call to [Server.Connect] or [Client.Connect].