Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions md/request-cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ not a string, number, or null) is different: when the receiver is built with
the `unstable_cancel_request` feature, it is reported back with an
out-of-band error notification, like any other malformed notification. A
receiver built without the feature never parses the params and ignores the
notification like any other unhandled `$/` notification (see
notification like any other unhandled notification (see
[Interoperability](#interoperability)).

## Interoperability

Protocol-level (`$/`-prefixed) notifications are optional by design. The SDK
ignores unhandled `$/` notifications instead of rejecting them with a
ignores unhandled notifications instead of rejecting them with a
method-not-found error, and does so even when the `unstable_cancel_request`
feature is disabled. A peer that sends `$/cancel_request` to a component built
without cancellation support therefore loses nothing: the request simply runs
Expand Down
6 changes: 3 additions & 3 deletions src/agent-client-protocol/src/concepts/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
//! finish with normal data, or may respond to the original request with
//! [`Error::request_cancelled`] (`-32800`). The requesting side always
//! receives a response to the original request; cancellation only changes
//! *which* response that is. Unhandled `$/`-prefixed notifications are ignored
//! by the SDK (even without this feature), so peers that do not support
//! cancellation simply will not act on it.
//! *which* response that is. Unhandled notifications are ignored by the SDK
//! (even without this feature), so peers that do not support cancellation
//! simply will not act on it.
//!
//! # Cancelling outgoing requests
//!
Expand Down
31 changes: 7 additions & 24 deletions src/agent-client-protocol/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn params_from_transport(params: Option<RawJsonRpcParams>) -> serde_json::Value
/// │ Handled::No
/// ▼
/// ┌─────────────────────────────────────────────────────────────────┐
/// │ Unhandled: Error response sent (or queued if retry=true)
/// │ Unhandled: requests error, notifications ignored
/// └─────────────────────────────────────────────────────────────────┘
/// ```
///
Expand All @@ -280,7 +280,8 @@ fn params_from_transport(params: Option<RawJsonRpcParams>) -> serde_json::Value
///
/// The `retry` flag in `Handled::No` controls what happens when no handler claims a message:
///
/// - **`retry: false`** (default) - Send a "method not found" error response immediately.
/// - **`retry: false`** (default) - Send a "method not found" error
/// response immediately for requests, or ignore notifications.
/// - **`retry: true`** - Queue the message and retry it when new dynamic handlers are added.
///
/// This mechanism exists because of a timing issue with sessions: when a `session/new`
Expand Down Expand Up @@ -1922,6 +1923,7 @@ fn cancellation_request_id_from_message(
/// flows on to the handler chain, which is responsible for reporting it.
///
/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
#[cfg(feature = "unstable_cancel_request")]
fn peel_successor_envelopes<'message>(
mut method: &'message str,
mut params: &'message serde_json::Value,
Expand Down Expand Up @@ -1976,28 +1978,6 @@ pub fn is_cancel_request_notification<N: JsonRpcNotification>(notification: &N)
}
}

/// Whether the dispatch is a protocol-level (`$/`-prefixed) notification,
/// possibly wrapped in a [`SuccessorMessage`] envelope.
///
/// Unhandled protocol-level notifications are ignored rather than rejected
/// with a method-not-found error. This is deliberately *not* feature-gated:
/// protocol-level notifications are optional by design, so a peer that sends
/// `$/cancel_request` must be able to interoperate with an SDK built without
/// `unstable_cancel_request` (which simply won't act on it).
///
/// A handler that explicitly declines with `retry: true` takes precedence
/// over this fallback: the notification is queued for newly registered
/// dynamic handlers like any other retried message.
///
/// [`SuccessorMessage`]: crate::schema::SuccessorMessage
fn is_protocol_level_notification(dispatch: &Dispatch) -> bool {
let Dispatch::Notification(message) = dispatch else {
return false;
};
let (method, _params) = peel_successor_envelopes(&message.method, &message.params);
method.starts_with("$/")
}

/// Messages send to be serialized over the transport.
#[derive(Debug)]
enum OutgoingMessage {
Expand Down Expand Up @@ -4585,6 +4565,7 @@ impl<R: Role> ConnectTo<R> for Channel {
mod tests {
use super::*;

#[cfg(feature = "unstable_cancel_request")]
#[test]
fn peel_successor_envelopes_returns_plain_messages_unchanged() {
let params = serde_json::json!({ "key": "value" });
Expand All @@ -4593,6 +4574,7 @@ mod tests {
assert_eq!(peeled, &params);
}

#[cfg(feature = "unstable_cancel_request")]
#[test]
fn peel_successor_envelopes_unwraps_nested_envelopes() {
let params = serde_json::json!({
Expand All @@ -4607,6 +4589,7 @@ mod tests {
assert_eq!(peeled, &serde_json::json!({ "requestId": "req-1" }));
}

#[cfg(feature = "unstable_cancel_request")]
#[test]
fn peel_successor_envelopes_leaves_malformed_envelopes_intact() {
// No string `method` field: the envelope cannot be peeled, so the
Expand Down
19 changes: 9 additions & 10 deletions src/agent-client-protocol/src/jsonrpc/incoming_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,25 +436,24 @@ async fn dispatch_dispatch<Counterpart: Role>(
// If the message was never handled, check whether the retry flag was set.
// If so, enqueue it for later processing. Else, reject it.
//
// An explicit retry request takes precedence over the protocol-level
// fallback below, so that handlers may defer `$/` notifications to a
// dynamic handler that has not been registered yet.
// An explicit retry request takes precedence over the unhandled-message
// fallback below, so that handlers may defer notifications to a dynamic
// handler that has not been registered yet.
if retry_any {
tracing::debug!(
?method,
"Retrying message as new dynamic handlers are added"
);
pending_messages.push(dispatch);
Ok(())
} else if super::is_protocol_level_notification(&dispatch) {
// Unsupported protocol-level notifications are ignored rather than
// rejected; see `is_protocol_level_notification` for the rationale.
tracing::debug!(?method, "Ignoring unhandled protocol-level notification");
Ok(())
} else {
match dispatch {
Dispatch::Request(..) | Dispatch::Notification(_) => {
tracing::info!(?method, "Rejecting message with error, no handler");
Dispatch::Notification(_) => {
tracing::debug!(?method, "Ignoring unhandled notification");
Ok(())
}
Dispatch::Request(..) => {
tracing::info!(?method, "Rejecting request with error, no handler");
let method = dispatch.method().to_string();
dispatch.respond_with_error(
crate::Error::method_not_found().data(method),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ async fn retried_protocol_level_notification_reaches_later_dynamic_handler() {
.builder()
.on_receive_notification(
// Decline the notification but ask for a retry: this must
// take precedence over the "ignore unhandled `$/`
// take precedence over the "ignore unhandled
// notifications" fallback.
async |cancel: CancelRequestNotification, cx: ConnectionTo<UntypedRole>| {
Ok::<_, agent_client_protocol::Error>(Handled::No {
Expand Down
80 changes: 72 additions & 8 deletions src/agent-client-protocol/tests/jsonrpc_unhandled_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
//! Everything in this file holds **regardless of feature flags** — these are
//! baseline guarantees of the dispatch loop:
//!
//! - Unhandled protocol-level (`$/`-prefixed) notifications are ignored
//! instead of rejected, so peers that use optional protocol-level
//! extensions (such as `$/cancel_request`) interoperate with builds that do
//! not support them.
//! - Unhandled notifications are ignored instead of rejected, so peers that
//! use optional protocol-level or vendor extensions interoperate with
//! components that do not support them.
//! - A `_proxy/successor` envelope that cannot be peeled still reaches the
//! handler chain unchanged.
//! - A response routed to a request handle that was already dropped is
Expand Down Expand Up @@ -112,7 +111,7 @@ impl JsonRpcResponse for SimpleResponse {
}

#[tokio::test(flavor = "current_thread")]
async fn unhandled_protocol_level_notifications_are_ignored() {
async fn unhandled_notifications_are_ignored() {
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::task::LocalSet;

Expand Down Expand Up @@ -149,18 +148,32 @@ async fn unhandled_protocol_level_notifications_are_ignored() {
client_writer
.write_all(
br#"{"jsonrpc":"2.0","method":"$/cancel_request","params":{"requestId":"req-1"}}
"#,
)
.await
.unwrap();
client_writer
.write_all(
br#"{"jsonrpc":"2.0","method":"_x.ai/settings/update","params":{"theme":"auto"}}
"#,
)
.await
.unwrap();
client_writer
.write_all(
br#"{"jsonrpc":"2.0","method":"unknown/notification","params":{"value":1}}
"#,
)
.await
.unwrap();
client_writer.flush().await.unwrap();

// The server processes messages in order: a response to this
// request proves the unknown `$/` notification before it was
// request proves the unknown notifications sent before it were
// ignored without erroring or closing the connection.
client_writer
.write_all(
br#"{"jsonrpc":"2.0","id":2,"method":"simple_method","params":{"message":"after cancel"}}
br#"{"jsonrpc":"2.0","id":2,"method":"simple_method","params":{"message":"after notifications"}}
"#,
)
.await
Expand All @@ -173,7 +186,58 @@ async fn unhandled_protocol_level_notifications_are_ignored() {
"jsonrpc": "2.0",
"id": 2,
"result": {
"result": "echo: after cancel"
"result": "echo: after notifications"
}
}"#]]
.assert_eq(&serde_json::to_string_pretty(&response).unwrap());
})
.await;
}

#[tokio::test(flavor = "current_thread")]
async fn unhandled_requests_are_rejected_with_method_not_found() {
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::task::LocalSet;

let local = LocalSet::new();

local
.run_until(async {
let (mut client_writer, server_reader) = tokio::io::duplex(4096);
let (server_writer, client_reader) = tokio::io::duplex(4096);

let server_transport = agent_client_protocol::ByteStreams::new(
server_writer.compat_write(),
server_reader.compat(),
);
let server = UntypedRole.builder();

tokio::task::spawn_local(async move {
if let Err(error) = server.connect_to(server_transport).await {
panic!("server should stay alive: {error:?}");
}
});

let mut client_reader = BufReader::new(client_reader);

client_writer
.write_all(
br#"{"jsonrpc":"2.0","id":1,"method":"unknown/request","params":{"value":1}}
"#,
)
.await
.unwrap();
client_writer.flush().await.unwrap();

let response = read_jsonrpc_response_line(&mut client_reader).await;
expect![[r#"
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32601,
"message": "Method not found",
"data": "unknown/request"
}
}"#]]
.assert_eq(&serde_json::to_string_pretty(&response).unwrap());
Expand Down