From 8758e020642cf8038f5d71418368be9216c15e8e Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 12 Jun 2026 14:04:26 +0200 Subject: [PATCH] [Fix #1087] A2A implementation Signed-off-by: fjtirado --- impl/a2a/pom.xml | 30 +++ .../executors/a2a/A2AExceptionHandler.java | 37 +++ .../impl/executors/a2a/A2AExecutor.java | 91 +++++++ .../executors/a2a/A2AExecutorBuilder.java | 93 +++++++ .../executors/a2a/A2ARequestDispatcher.java | 34 +++ .../impl/executors/a2a/A2AUtils.java | 137 +++++++++++ .../executors/a2a/CancelTaskDispatcher.java | 47 ++++ .../impl/executors/a2a/GetTaskDispatcher.java | 47 ++++ .../executors/a2a/ListTaskDispatcher.java | 69 ++++++ .../impl/executors/a2a/MessageConsumer.java | 36 +++ .../executors/a2a/MessageConsumerFactory.java | 38 +++ .../impl/executors/a2a/MessageDispatcher.java | 92 +++++++ .../executors/a2a/MessageSendConsumer.java | 41 ++++ .../executors/a2a/MessageStreamConsumer.java | 73 ++++++ ...orkflow.impl.executors.CallableTaskBuilder | 1 + .../impl/WorkflowError.java | 16 +- impl/pom.xml | 18 ++ impl/test/pom.xml | 4 + .../impl/test/A2AExecutorTest.java | 231 ++++++++++++++++++ .../a2a/a2a-hello-world.yaml | 19 ++ .../a2a/a2a-life-meaning.yaml | 21 ++ .../a2a/a2a-task-handler.yaml | 30 +++ .../workflows-samples/a2a/a2a-tell-joke.yaml | 21 ++ 23 files changed, 1220 insertions(+), 6 deletions(-) create mode 100644 impl/a2a/pom.xml create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java create mode 100644 impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java create mode 100644 impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java create mode 100644 impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml create mode 100644 impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml diff --git a/impl/a2a/pom.xml b/impl/a2a/pom.xml new file mode 100644 index 000000000..91a8fac99 --- /dev/null +++ b/impl/a2a/pom.xml @@ -0,0 +1,30 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-a2a + Serverless Workflow :: Impl :: A2A + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + org.a2aproject.sdk + a2a-java-sdk-client + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java new file mode 100644 index 000000000..2ccdcdda2 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPosition; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +class A2AExceptionHandler implements Consumer { + + private final CompletableFuture future; + private final WorkflowPosition position; + + A2AExceptionHandler(CompletableFuture future, WorkflowPosition position) { + this.future = future; + this.position = position; + } + + @Override + public void accept(Throwable ex) { + future.completeExceptionally(A2AUtils.workflowException(position, ex)); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java new file mode 100644 index 000000000..1727cb059 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java @@ -0,0 +1,91 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.client.config.ClientConfig; +import org.a2aproject.sdk.client.http.A2ACardResolver; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig; +import org.a2aproject.sdk.spec.A2AClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class A2AExecutor implements CallableTask { + + private final WorkflowValueResolver uriSupplier; + private final A2ARequestDispatcher dispatcher; + private final Optional>> mapResolver; + + private static final Logger logger = LoggerFactory.getLogger(A2AExecutor.class); + + public A2AExecutor( + WorkflowValueResolver uriSupplier, + A2ARequestDispatcher dispatcher, + Optional>> mapResolver) { + this.uriSupplier = uriSupplier; + this.dispatcher = dispatcher; + this.mapResolver = mapResolver; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + URI uri = uriSupplier.apply(workflowContext, taskContext, input); + + return CompletableFuture.supplyAsync( + () -> { + try { + return A2ACardResolver.builder() + .baseUrl(uri.resolve("/").toString()) + .agentCardPath(uri.getPath()) + .build() + .getAgentCard(); + } catch (A2AClientException ex) { + throw A2AUtils.workflowException(taskContext.position(), ex); + } + }, + workflowContext.definition().application().executorService()) + .thenCompose( + agentCard -> { + logger.debug("Agent card is {}", agentCard); + try { + return dispatcher.apply( + agentCard, + Client.builder(agentCard) + .clientConfig(new ClientConfig.Builder().build()) + .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) + .build(), + mapResolver + .map(m -> m.apply(workflowContext, taskContext, input)) + .orElse(Map.of()), + workflowContext, + taskContext); + } catch (A2AClientException ex) { + throw A2AUtils.workflowException(taskContext.position(), ex); + } + }); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java new file mode 100644 index 000000000..b49e37e88 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.api.types.A2AArguments; +import io.serverlessworkflow.api.types.CallA2A; +import io.serverlessworkflow.api.types.Parameters; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.WithA2AParameters; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import io.serverlessworkflow.impl.executors.CallableTaskFactory; +import java.net.URI; +import java.util.Map; +import java.util.Optional; + +public class A2AExecutorBuilder implements CallableTaskBuilder { + + @Override + public boolean accept(Class clazz) { + return CallA2A.class.equals(clazz); + } + + @Override + public CallableTaskFactory init( + CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) { + A2AArguments args = task.getWith(); + + WorkflowValueResolver uriSupplier; + if (args.getServer() != null) { + uriSupplier = definition.resourceLoader().uriSupplier(args.getServer()); + } else if (args.getAgentCard() != null) { + uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint()); + } else { + throw new IllegalArgumentException("Neither server nor agent card is set for task: " + task); + } + + A2ARequestDispatcher dispatcher = + switch (args.getMethod()) { + case MESSAGE_SEND -> + new MessageDispatcher( + (workflowContext, taskContext, completableFuture) -> + new MessageSendConsumer(workflowContext.definition(), completableFuture)); + case MESSAGE_STREAM -> + new MessageDispatcher( + (workflowContext, taskContext, completableFuture) -> + new MessageStreamConsumer( + workflowContext.definition(), completableFuture, taskContext.position())); + case TASKS_LIST -> new ListTaskDispatcher(); + case TASKS_GET -> new GetTaskDispatcher(); + case TASKS_CANCEL -> new CancelTaskDispatcher(); + // TODO handle missing cases + case AGENT_GET_AUTHENTICATED_EXTENDED_CARD, + TASKS_PUSH_NOTIFICATION_CONFIG_DELETE, + TASKS_PUSH_NOTIFICATION_CONFIG_GET, + TASKS_PUSH_NOTIFICATION_CONFIG_LIST, + TASKS_PUSH_NOTIFICATION_CONFIG_SET, + TASKS_RESUBSCRIBE -> + throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod()); + }; + + Parameters parameters = args.getParameters(); + Optional>> mapResolver; + if (parameters == null) { + mapResolver = Optional.empty(); + } else { + WithA2AParameters a2aParameters = parameters.getWithA2AParameters(); + mapResolver = + Optional.of( + WorkflowUtils.buildMapResolver( + definition.application(), + parameters.getString(), + a2aParameters != null ? a2aParameters.getAdditionalProperties() : null)); + } + return () -> new A2AExecutor(uriSupplier, dispatcher, mapResolver); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java new file mode 100644 index 000000000..6e8c91b17 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.AgentCard; + +@FunctionalInterface +interface A2ARequestDispatcher { + CompletableFuture apply( + AgentCard agentCard, + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext); +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java new file mode 100644 index 000000000..75c844350 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java @@ -0,0 +1,137 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.types.Errors; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class A2AUtils { + + static final String TASK_ID = "taskId"; + + private static final Logger logger = LoggerFactory.getLogger(A2AUtils.class); + + static T param(Map map, String key, Class instanceClass) { + Object obj = map.get(key); + return isInstanceOrThrow( + obj, + instanceClass, + () -> "value " + obj + " for key " + key + " is not an instance of type " + instanceClass); + } + + static void paramThen( + Map map, String key, Class instanceClass, Consumer consumer) { + isInstanceThen(map.get(key), instanceClass, consumer); + } + + static void isInstanceThen(Object obj, Class instanceClass, Consumer consumer) { + isInstance(obj, instanceClass).ifPresent(consumer); + } + + static T optionalParam( + Map map, String key, Class instanceClass, Supplier defaultValue) { + return isInstanceOrDefault(map.get(key), instanceClass, defaultValue); + } + + static > T enumParam( + Map map, String key, Class instanceClass, T defaultValue) { + Object obj = map.get(key); + + if (instanceClass.isInstance(obj)) { + return instanceClass.cast(obj); + } else if (String.class.isInstance(obj)) { + return Enum.valueOf(instanceClass, String.class.cast(obj)); + } else { + return defaultValue; + } + } + + static T optionalParam(Map map, String key, Class instanceClass) { + return isInstanceOrDefault(map.get(key), instanceClass, () -> null); + } + + static T isInstanceOrThrow(Object obj, Class instanceClass, Supplier message) { + return isInstance(obj, instanceClass) + .orElseThrow(() -> new IllegalArgumentException(message.get())); + } + + static T isInstanceOrDefault(Object obj, Class instanceClass, Supplier defaultValue) { + return isInstance(obj, instanceClass) + .orElseGet( + () -> { + if (obj != null) { + logger.warn( + "Object {} is expected to be of class {} but it is of class {}. Using provided default", + obj, + instanceClass.getName(), + obj.getClass().getName()); + } + return defaultValue.get(); + }); + } + + private static Optional isInstance(Object obj, Class instanceClass) { + if (instanceClass.isInstance(obj)) { + return Optional.of(instanceClass.cast(obj)); + } else if (Instant.class.isAssignableFrom(instanceClass) && String.class.isInstance(obj)) { + return Optional.of(instanceClass.cast(Instant.parse(String.class.cast(obj)))); + } else { + return Optional.empty(); + } + } + + static WorkflowModel fromTask(WorkflowModelFactory factory, Task task) { + return factory.fromOther(task); + } + + static WorkflowModel fromTask(WorkflowContext context, Task task) { + return fromTask(context.definition().application().modelFactory(), task); + } + + static WorkflowModel fromMessage(WorkflowModelFactory factory, Message message) { + return factory.fromOther(message); + } + + static WorkflowError.Builder workflowError(WorkflowPosition position) { + return WorkflowError.error(Errors.RUNTIME.toString(), Errors.RUNTIME.status()) + .instance(position.jsonPointer()); + } + + static WorkflowException workflowException(WorkflowPosition position, Throwable ex) { + return new WorkflowException( + A2AUtils.workflowError(position) + .title(ex.getMessage()) + .details(WorkflowError.getStackTrace(ex)) + .build(), + ex); + } + + private A2AUtils() {} +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java new file mode 100644 index 000000000..eb27c43b8 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.CancelTaskParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CancelTaskDispatcher implements A2ARequestDispatcher { + + private static final Logger logger = LoggerFactory.getLogger(CancelTaskDispatcher.class); + + @Override + public CompletableFuture apply( + AgentCard agentCard, + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext) { + String taskId = param(parameters, A2AUtils.TASK_ID, String.class); + logger.debug("Cancelling task {}", taskId); + return CompletableFuture.completedFuture( + A2AUtils.fromTask(workflowContext, client.cancelTask(new CancelTaskParams(taskId)))); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java new file mode 100644 index 000000000..0b0526678 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.TaskQueryParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class GetTaskDispatcher implements A2ARequestDispatcher { + + private static final Logger logger = LoggerFactory.getLogger(GetTaskDispatcher.class); + + @Override + public CompletableFuture apply( + AgentCard agentCard, + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext) { + String taskId = param(parameters, A2AUtils.TASK_ID, String.class); + logger.debug("Getting information of task {}", taskId); + return CompletableFuture.completedFuture( + A2AUtils.fromTask(workflowContext, client.getTask(new TaskQueryParams(taskId)))); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java new file mode 100644 index 000000000..8294b42c2 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.enumParam; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.jsonrpc.common.wrappers.ListTasksResult; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.ListTasksParams; +import org.a2aproject.sdk.spec.TaskState; + +class ListTaskDispatcher implements A2ARequestDispatcher { + + @Override + public CompletableFuture apply( + AgentCard agentCard, + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext) { + ListTasksResult tasks = + client.listTasks( + new ListTasksParams( + optionalParam(parameters, "contextId", String.class), + enumParam(parameters, "status", TaskState.class, null), + optionalParam(parameters, "pageSize", Integer.class), + optionalParam(parameters, "pageToken", String.class), + optionalParam(parameters, "historyLength", Integer.class), + optionalParam(parameters, "statusTimestampAfter", Instant.class), + optionalParam(parameters, "includeArtifacts", Boolean.class), + optionalParam( + parameters, + "tenant", + String.class, + () -> + Optional.ofNullable( + agentCard.supportedInterfaces().iterator().next().tenant()) + .orElse("")))); + + WorkflowModelFactory factory = workflowContext.definition().application().modelFactory(); + WorkflowModelCollection model = factory.createCollection(); + tasks.tasks().forEach(t -> model.add(A2AUtils.fromTask(factory, t))); + return CompletableFuture.completedFuture(model); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java new file mode 100644 index 000000000..2bc06aa0f --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.spec.AgentCard; + +abstract class MessageConsumer implements BiConsumer { + + protected final CompletableFuture completableFuture; + protected final WorkflowModelFactory factory; + + public MessageConsumer( + WorkflowDefinition definition, CompletableFuture completableFuture) { + this.completableFuture = completableFuture; + factory = definition.application().modelFactory(); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java new file mode 100644 index 000000000..88b58e9df --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +@FunctionalInterface +interface MessageConsumerFactory { + + MessageConsumer buildConsumer( + WorkflowContext workflowContext, + TaskContext taskContext, + CompletableFuture completableFuture); + + default Consumer buildExceptionHandler( + WorkflowContext workflowContext, + TaskContext taskContext, + CompletableFuture completableFuture) { + return new A2AExceptionHandler(completableFuture, taskContext.position()); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java new file mode 100644 index 000000000..385b9932c --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java @@ -0,0 +1,92 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.isInstanceOrThrow; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.DataPart; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Message.Role; +import org.a2aproject.sdk.spec.Part; +import org.a2aproject.sdk.spec.TextPart; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class MessageDispatcher implements A2ARequestDispatcher { + + private static final Logger logger = LoggerFactory.getLogger(MessageDispatcher.class); + private final MessageConsumerFactory consumerFactory; + + public MessageDispatcher(MessageConsumerFactory consumerFactory) { + this.consumerFactory = consumerFactory; + } + + @Override + public CompletableFuture apply( + AgentCard agentCard, + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext) { + CompletableFuture future = new CompletableFuture<>(); + MessageConsumer consumer = consumerFactory.buildConsumer(workflowContext, taskContext, future); + Message message = buildMessage(parameters); + logger.debug("Sending message {}", message); + client.sendMessage( + message, + List.of(consumer), + consumerFactory.buildExceptionHandler(workflowContext, taskContext, future)); + return future; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private Message buildMessage(Map parameters) { + + Map message = param(parameters, "message", Map.class); + Message.Builder messageBuilder = Message.builder(); + + Collection items = param(message, "parts", Collection.class); + List> parts = new ArrayList<>(); + for (Object item : items) { + Map part = isInstanceOrThrow(item, Map.class, () -> "One item of parts is not an object"); + String kind = optionalParam(part, "kind", String.class, () -> "text"); + parts.add( + switch (kind) { + case TextPart.TEXT -> new TextPart(param(part, TextPart.TEXT, String.class)); + case DataPart.DATA -> new DataPart(param(part, DataPart.DATA, Object.class)); + default -> throw new UnsupportedOperationException("Unimplemented kind: " + kind); + }); + } + messageBuilder.parts(parts); + A2AUtils.paramThen(message, "messageId", String.class, messageBuilder::messageId); + A2AUtils.paramThen(message, "contextId", String.class, messageBuilder::contextId); + + messageBuilder.role(A2AUtils.enumParam(message, "role", Role.class, Role.ROLE_USER)); + return messageBuilder.build(); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java new file mode 100644 index 000000000..5ac792bd2 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.MessageEvent; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.spec.AgentCard; + +class MessageSendConsumer extends MessageConsumer { + + public MessageSendConsumer( + WorkflowDefinition definition, CompletableFuture completableFuture) { + super(definition, completableFuture); + } + + @Override + public void accept(ClientEvent event, AgentCard card) { + if (event instanceof MessageEvent resp) { + completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage())); + } else if (event instanceof TaskEvent resp) { + completableFuture.complete(A2AUtils.fromTask(factory, resp.getTask())); + } + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java new file mode 100644 index 000000000..bd5e386e6 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPosition; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.MessageEvent; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.client.TaskUpdateEvent; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.Task; + +class MessageStreamConsumer extends MessageConsumer { + + private final WorkflowPosition position; + + public MessageStreamConsumer( + WorkflowDefinition definition, + CompletableFuture completableFuture, + WorkflowPosition position) { + super(definition, completableFuture); + this.position = position; + } + + @Override + public void accept(ClientEvent event, AgentCard card) { + if (event instanceof MessageEvent resp) { + completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage())); + } else if (event instanceof TaskUpdateEvent resp) { + checkTaskCompletion(resp.getTask()); + } else if (event instanceof TaskEvent resp) { + checkTaskCompletion(resp.getTask()); + } + } + + private void checkTaskCompletion(Task task) { + switch (task.status().state()) { + case TASK_STATE_REJECTED, TASK_STATE_FAILED, TASK_STATE_CANCELED: + completableFuture.completeExceptionally(exception(task)); + break; + case TASK_STATE_COMPLETED: + completableFuture.complete(A2AUtils.fromTask(factory, task)); + break; + default: + // do nothing + } + } + + private WorkflowException exception(Task task) { + return new WorkflowException( + A2AUtils.workflowError(position) + .title(task.status().state().toString()) + .details(task.history().toString()) + .build()); + } +} diff --git a/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 000000000..bc458e2b9 --- /dev/null +++ b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.a2a.A2AExecutorBuilder diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java index bd0d84d51..ee2c4a6c5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java @@ -83,16 +83,20 @@ private static WorkflowError error(Throwable cause, WorkflowPosition position) { } private static WorkflowError commonError(Throwable cause, WorkflowPosition position) { + return new WorkflowError( + cause.getClass().getTypeName(), + 500, + position == null ? null : position.jsonPointer(), + cause.getMessage(), + getStackTrace(cause)); + } + + public static String getStackTrace(Throwable cause) { StringWriter stackTrace = new StringWriter(); try (PrintWriter writer = new PrintWriter(stackTrace)) { cause.printStackTrace(writer); - return new WorkflowError( - cause.getClass().getTypeName(), - 500, - position == null ? null : position.jsonPointer(), - cause.getMessage(), - stackTrace.toString()); } + return stackTrace.toString(); } @Deprecated diff --git a/impl/pom.xml b/impl/pom.xml index 72737c6db..8c7b1249d 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,8 @@ 9.2.1 3.7.1 25.0.3 + 1.0.0.Final + 2.14.0 @@ -123,6 +125,11 @@ serverlessworkflow-impl-grpc ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-a2a + ${project.version} + net.thisptr jackson-jq @@ -191,6 +198,16 @@ polyglot ${version.org.graalvm.polyglot} + + org.a2aproject.sdk + a2a-java-sdk-client + ${version.org.a2aproject.sdk} + + + com.google.code.gson + gson + ${version.com.google.code.gson} + @@ -213,5 +230,6 @@ python grpc openapi-jackson + a2a diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 49ad1d893..165ef1a98 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -42,6 +42,10 @@ io.serverlessworkflow serverlessworkflow-impl-script-python + + io.serverlessworkflow + serverlessworkflow-impl-a2a + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java new file mode 100644 index 000000000..136083cbc --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java @@ -0,0 +1,231 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okio.Buffer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class A2AExecutorTest { + + private MockWebServer apiServer; + + @BeforeEach + public void setUp() throws IOException { + apiServer = new MockWebServer(); + apiServer.start(11111); + mockAgentCard(); + } + + @AfterEach + public void tearDown() throws IOException { + apiServer.close(); + } + + @Test + void testSendMessageMessage() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody( + "{\"jsonrpc\":\"2.0\",\"id\":\"14fc4dbc-989e-4f5b-a1bf-da25a7a2c10c\",\"result\":{\"message\":{\"messageId\":\"8545ebb6-2d8a-4676-8698-932f36c47e90\",\"contextId\":\"028f609d-c842-4851-afeb-5e61bd6dc3d1\",\"taskId\":\"4bfdadfc-3295-4019-b78f-e1681c86d6e9\",\"role\":\"ROLE_AGENT\",\"parts\":[{\"text\":\"Hello World\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}}}")); + try (WorkflowApplication appl = WorkflowApplication.builder().build()) { + WorkflowDefinition def = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-hello-world.yaml")); + assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject()) + .isEqualTo("Hello World"); + } + } + + private static final String SSE_STREAM = + "data: {\"jsonrpc\": \"2.0\",\"id\": \"363422be-b0f9-4692-a24d-278670e7c7f1\",\"result\":%s}\nid: %d\n\n"; + + private String getStreamBody(String json, int streamId) { + return String.format(SSE_STREAM, json, streamId); + } + + @Test + void testSendMessageTask() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody( + getStreamBody( + "{\"task\":{" + + " \"id\": \"363422be-b0f9-4692-a24d-278670e7c7f1\"," + + " \"contextId\": \"c295ea44-7543-4f78-b524-7a38915ad6e4\"," + + " \"status\": {" + + " \"state\": \"TASK_STATE_COMPLETED\"" + + " }," + + " \"artifacts\": [" + + " {" + + " \"artifactId\": \"9b6934dd-37e3-4eb1-8766-962efaab63a1\"," + + " \"name\": \"joke\"," + + " \"parts\": [" + + " {" + + " \"text\": \"Why did the chicken cross the road? To get to the other side!\"" + + " }" + + " ]" + + " }" + + " ]," + + " \"history\": [" + + " {" + + " \"role\": \"ROLE_USER\"," + + " \"parts\": [" + + " {" + + " \"text\": \"tell me a joke\"" + + " }" + + " ]," + + " \"messageId\": \"9229e770-767c-417b-a0b0-f0741243c589\"," + + " \"taskId\": \"363422be-b0f9-4692-a24d-278670e7c7f1\"," + + " \"contextId\": \"c295ea44-7543-4f78-b524-7a38915ad6e4\"" + + " }" + + " ]," + + " \"metadata\": {}" + + " }}", + 0))); + + try (WorkflowApplication appl = WorkflowApplication.builder().build()) { + WorkflowDefinition def = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-tell-joke.yaml")); + assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject()) + .isEqualTo("Why did the chicken cross the road? To get to the other side!"); + } + } + + @Test + void testSendMessageStream() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + + Buffer buffer = new Buffer(); + buffer.writeUtf8( + getStreamBody( + "{\"task\":{\"id\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-23T16:23:42.315374250Z\"},\"artifacts\":[],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}", + 0)); + buffer.writeUtf8( + getStreamBody( + "{\"artifactUpdate\":{\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"artifact\":{\"artifactId\":\"4ad59044-7d52-454c-84b9-f9d49594abed\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]},\"append\":false,\"lastChunk\":false,\"metadata\":{}}}", + 1)); + buffer.writeUtf8( + getStreamBody( + "{\"statusUpdate\":{\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_COMPLETED\",\"timestamp\":\"2026-06-23T16:23:42.315784582Z\"},\"metadata\":{}}}", + 2)); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody(buffer) + .setBodyDelay(100, TimeUnit.MILLISECONDS)); + + try (WorkflowApplication appl = WorkflowApplication.builder().build()) { + WorkflowDefinition def = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-life-meaning.yaml")); + assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject()) + .isEqualTo( + "After some time thinking about your complex question, I feel emptiness and decide to close the task without answering"); + } + } + + @Test + void testListGetCancelTask() + throws IOException, InterruptedException, ExecutionException, TimeoutException { + + try (WorkflowApplication appl = WorkflowApplication.builder().build()) { + Buffer buffer = new Buffer(); + buffer.writeUtf8( + getStreamBody( + "{\"task\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-23T16:23:42.315374250Z\"},\"artifacts\":[],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}", + 0)); + buffer.writeUtf8( + getStreamBody( + "{\"artifactUpdate\":{\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"artifact\":{\"artifactId\":\"4ad59044-7d52-454c-84b9-f9d49594abed\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]},\"append\":false,\"lastChunk\":false,\"metadata\":{}}}", + 1)); + buffer.writeUtf8( + getStreamBody( + "{\"statusUpdate\":{\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_CANCELED\",\"timestamp\":\"2026-06-24T12:11:49.591113597Z\"},\"metadata\":{}}}", + 2)); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody(buffer)); + mockAgentCard(); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody( + "{\"jsonrpc\":\"2.0\",\"id\":\"a8758715-2018-4f19-b78b-38b484089153\",\"result\":{\"tasks\":[{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-24T12:11:49.468232286Z\"},\"artifacts\":[],\"history\":[],\"metadata\":{}}],\"nextPageToken\":\"\",\"pageSize\":1,\"totalSize\":1}}")); + + mockAgentCard(); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody( + "{\"jsonrpc\":\"2.0\",\"id\":\"d5dbc8de-130e-49d1-af88-370551f0ed69\",\"result\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-24T12:11:49.468232286Z\"},\"artifacts\":[{\"artifactId\":\"4febb066-3096-43aa-a8e5-ec2e3cabfe8a\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]}],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}")); + + mockAgentCard(); + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody( + "{\"jsonrpc\":\"2.0\",\"id\":\"9c46108c-daae-4eff-af61-a8d42cfd923f\",\"result\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_CANCELED\",\"timestamp\":\"2026-06-24T12:11:49.591113597Z\"},\"artifacts\":[{\"artifactId\":\"4febb066-3096-43aa-a8e5-ec2e3cabfe8a\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]}],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}")); + + WorkflowDefinition taskDef = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-life-meaning.yaml")); + WorkflowDefinition handlerDef = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-task-handler.yaml")); + assertThatThrownBy(() -> taskDef.instance().start().join()) + .hasCauseInstanceOf(WorkflowException.class); + assertThat(handlerDef.instance().start().join().asMap().orElseThrow().get("id")) + .isEqualTo("12253522-b561-4d7a-8fd7-d3a71e465f67"); + } + } + + private void mockAgentCard() { + apiServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody( + "{\"name\":\"Hello World Agent\",\"description\":\"Just a hello world agent\",\"version\":\"1.0.0\",\"documentationUrl\":\"http://example.com/docs\",\"capabilities\":{\"streaming\":true,\"pushNotifications\":true,\"extendedAgentCard\":false},\"defaultInputModes\":[\"text\"],\"defaultOutputModes\":[\"text\"],\"skills\":[{\"id\":\"hello_world\",\"name\":\"Returns hello world\",\"description\":\"just returns hello world\",\"tags\":[\"hello world\"],\"examples\":[\"hi\",\"hello world\"]}],\"supportedInterfaces\":[{\"protocolBinding\":\"JSONRPC\",\"url\":\"http://localhost:11111\",\"protocolVersion\":\"1.0\"}],\"preferredTransport\":\"JSONRPC\"}")); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml new file mode 100644 index 000000000..f8d1da00d --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml @@ -0,0 +1,19 @@ +document: + dsl: '1.0.3' + namespace: test + name: a2a-hello-world + version: '0.1.0' +do: + - sayHello: + call: a2a + with: + method: message/send + agentCard: + endpoint: http://localhost:11111 + parameters: + message: + parts: + - kind: text + text: Hello Agent! + output: + as: .parts | map(.text) | join(" ") \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml new file mode 100644 index 000000000..8ca9e1617 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml @@ -0,0 +1,21 @@ +document: + dsl: '1.0.3' + namespace: test + name: a2a-life-meaning + version: '0.1.0' +do: + - complexQuestion: + call: a2a + with: + method: message/stream + agentCard: + endpoint: http://localhost:11111 + parameters: + message: + messageId: 9229e770-767c-417b-a0b0-f0741243c589 + parts: + - kind: text + text: why are we here? + output: + as: .artifacts[] | .parts | map (.text) | join (" ") + \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml new file mode 100644 index 000000000..37647aaa3 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml @@ -0,0 +1,30 @@ +document: + dsl: '1.0.3' + namespace: test + name: a2a-task-cancel + version: '0.1.0' +do: + - listTasks: + call: a2a + with: + method: tasks/list + agentCard: + endpoint: http://localhost:11111 + output: + as: .[0] + - getTasks: + call: a2a + with: + method: tasks/get + agentCard: + endpoint: http://localhost:11111 + parameters: + taskId: ${.id} + - cancelTasks: + call: a2a + with: + method: tasks/cancel + agentCard: + endpoint: http://localhost:11111 + parameters: + taskId: ${.id} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml new file mode 100644 index 000000000..cb4c5c790 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml @@ -0,0 +1,21 @@ +document: + dsl: '1.0.3' + namespace: test + name: a2a-tell-joke + version: '0.1.0' +do: + - tellJoke: + call: a2a + with: + method: message/send + agentCard: + endpoint: http://localhost:11111 + parameters: + message: + messageId: 9229e770-767c-417b-a0b0-f0741243c589 + parts: + - kind: text + text: tell me a joke + output: + as: .artifacts[] | .parts | map (.text) | join (" ") + \ No newline at end of file