diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ContextPropagator.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextPropagator.java new file mode 100644 index 000000000..8077b3173 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextPropagator.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +/** + * Framework-agnostic hook for capturing the context of the thread that starts a workflow instance + * so it can be re-established around asynchronous task execution. + */ +@FunctionalInterface +public interface ContextPropagator { + + /** + * Captures the current context. Invoked on the thread that starts the workflow instance (the + * thread that calls {@link WorkflowInstance#start()}). + * + * @return a snapshot able to re-establish the captured context on another thread. + */ + ContextSnapshot capture(); + + ContextPropagator NOOP = () -> ContextSnapshot.NOOP; +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ContextSnapshot.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextSnapshot.java new file mode 100644 index 000000000..ffb68d428 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ContextSnapshot.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.function.Supplier; + +public interface ContextSnapshot { + + Supplier wrap(Supplier supplier); + + ContextSnapshot NOOP = + new ContextSnapshot() { + @Override + public Supplier wrap(Supplier supplier) { + return supplier; + } + }; +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index c70134cd3..d2cfe5b50 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -80,6 +80,7 @@ public class WorkflowApplication implements AutoCloseable { private final Map definitions; private final WorkflowPositionFactory positionFactory; private final ExecutorServiceFactory executorFactory; + private final ContextPropagator contextPropagator; private final RuntimeDescriptorFactory runtimeDescriptorFactory; private final EventConsumer eventConsumer; private final Collection eventPublishers; @@ -109,6 +110,7 @@ private WorkflowApplication(Builder builder) { this.idFactory = builder.idFactory; this.runtimeDescriptorFactory = builder.descriptorFactory; this.executorFactory = builder.executorFactory; + this.contextPropagator = builder.contextPropagator; this.listenersByPriority = groupByPriority(new LinkedHashSet<>(builder.listeners)); this.definitions = new ConcurrentHashMap<>(); this.eventConsumer = builder.eventConsumer; @@ -232,6 +234,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private WorkflowInstanceIdFactory idFactory; private WorkflowScheduler scheduler; private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); + private ContextPropagator contextPropagator = ContextPropagator.NOOP; private EventConsumer eventConsumer; private Collection eventPublishers = new ArrayList<>(); private RuntimeDescriptorFactory descriptorFactory = @@ -311,6 +314,11 @@ public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { return this; } + public Builder withContextPropagator(ContextPropagator contextPropagator) { + this.contextPropagator = contextPropagator; + return this; + } + public Builder withPositionFactory(WorkflowPositionFactory positionFactory) { this.positionFactory = positionFactory; return this; @@ -536,6 +544,10 @@ public ExecutorService executorService() { return executorFactory.get(); } + public ContextPropagator contextPropagator() { + return contextPropagator; + } + public boolean isLifeCycleCEPublishingEnabled() { return lifeCycleCEPublishingEnabled; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 748a426a8..a239da56f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -51,6 +51,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { protected AtomicReference> futureRef = new AtomicReference<>(); protected Instant completedAt; + private volatile ContextSnapshot contextSnapshot = ContextSnapshot.NOOP; + protected final Map additionalObjects = new ConcurrentHashMap<>(); protected final Map iterationsMap = new ConcurrentHashMap<>(); @@ -84,6 +86,8 @@ protected final CompletableFuture startExecution( if (future != null) { return future; } + + this.contextSnapshot = workflowContext.definition().application().contextPropagator().capture(); status(WorkflowStatus.RUNNING); future = @@ -114,6 +118,10 @@ protected final CompletableFuture startExecution( return future; } + public ContextSnapshot contextSnapshot() { + return contextSnapshot; + } + private void whenCompleted(WorkflowModel result, Throwable ex) { completedAt = Instant.now(); additionalObjects.values().stream() @@ -121,7 +129,7 @@ private void whenCompleted(WorkflowModel result, Throwable ex) { .map(AutoCloseable.class::cast) .forEach(WorkflowUtils::safeClose); if (ex != null) { - handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex); + handleException(ex instanceof CompletionException ? ex.getCause() : ex); } workflowContext.definition().removeInstance(this); } @@ -253,10 +261,7 @@ public boolean resume() { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get()) && suspended != null) { - suspended.forEach( - (k, v) -> { - k.complete(v); - }); + suspended.forEach(CompletableFuture::complete); suspended = null; result = true; } else { @@ -323,9 +328,7 @@ public boolean cancel() { if (result) { publishEvent( workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext))); - if (toCancel != null) { - toCancel.forEach(t -> t.cancel(true)); - } + toCancel.forEach(t -> t.cancel(true)); } return result; } diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java index c787cd2bd..9af124032 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/http/HttpExecutor.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.executors.http; +import io.serverlessworkflow.impl.ContextSnapshot; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; @@ -75,10 +76,14 @@ public CompletableFuture apply( target = target.queryParam(entry.getKey(), entry.getValue()); } Builder request = target.request(); - requestDecorators.forEach(d -> d.decorate(request, workflow, taskContext)); headersMap.ifPresent(h -> h.apply(workflow, taskContext, input).forEach(request::header)); + ContextSnapshot contextSnapshot = workflow.instance().contextSnapshot(); return CompletableFuture.supplyAsync( - () -> requestFunction.apply(request, uri, workflow, taskContext, input), + contextSnapshot.wrap( + () -> { + requestDecorators.forEach(d -> d.decorate(request, workflow, taskContext)); + return requestFunction.apply(request, uri, workflow, taskContext, input); + }), workflow.definition().application().executorService()); } }