Skip to content

Commit 61316c4

Browse files
committed
[Fix #782] Adding MVStore persistence
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent fec6de9 commit 61316c4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1205
-281
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,23 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.ForTask;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.ForTaskFunction;
2322
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
2423
import io.serverlessworkflow.api.types.func.TypedFunction;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
2625
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2726
import io.serverlessworkflow.impl.WorkflowPredicate;
2827
import io.serverlessworkflow.impl.WorkflowValueResolver;
2928
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3029
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
31-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3230
import java.util.Collection;
3331
import java.util.Optional;
3432

3533
public class JavaForExecutorBuilder extends ForExecutorBuilder {
3634

3735
protected JavaForExecutorBuilder(
38-
WorkflowMutablePosition position,
39-
ForTask task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader resourceLoader) {
43-
super(position, task, workflow, application, resourceLoader);
36+
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
37+
super(position, task, definition);
4438
}
4539

4640
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@
1919

2020
import io.serverlessworkflow.api.types.ListenTask;
2121
import io.serverlessworkflow.api.types.Until;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.UntilPredicate;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028

3129
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
3230

3331
protected JavaListenExecutorBuilder(
34-
WorkflowMutablePosition position,
35-
ListenTask task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader resourceLoader) {
39-
super(position, task, workflow, application, resourceLoader);
32+
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
33+
super(position, task, definition);
4034
}
4135

4236
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,19 @@
1919

2020
import io.serverlessworkflow.api.types.SwitchCase;
2121
import io.serverlessworkflow.api.types.SwitchTask;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028
import java.util.Optional;
3129

3230
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3331

3432
protected JavaSwitchExecutorBuilder(
35-
WorkflowMutablePosition position,
36-
SwitchTask task,
37-
Workflow workflow,
38-
WorkflowApplication application,
39-
ResourceLoader resourceLoader) {
40-
super(position, task, workflow, application, resourceLoader);
33+
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
34+
super(position, task, definition);
4135
}
4236

4337
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,23 @@
1717

1818
import io.serverlessworkflow.api.types.Task;
1919
import io.serverlessworkflow.api.types.TaskBase;
20-
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
2221
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2322
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2423
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624

2725
public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {
2826

2927
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
30-
WorkflowMutablePosition position,
31-
Task task,
32-
Workflow workflow,
33-
WorkflowApplication application,
34-
ResourceLoader resourceLoader) {
28+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
3529
if (task.getForTask() != null) {
36-
return new JavaForExecutorBuilder(
37-
position, task.getForTask(), workflow, application, resourceLoader);
30+
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
3831
} else if (task.getSwitchTask() != null) {
39-
return new JavaSwitchExecutorBuilder(
40-
position, task.getSwitchTask(), workflow, application, resourceLoader);
32+
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
4133
} else if (task.getListenTask() != null) {
42-
return new JavaListenExecutorBuilder(
43-
position, task.getListenTask(), workflow, application, resourceLoader);
34+
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
4435
} else {
45-
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
36+
return super.getTaskExecutor(position, task, definition);
4637
}
4738
}
4839
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import com.github.f4b6a3.ulid.UlidCreator;
19-
import io.serverlessworkflow.api.types.Document;
2019
import io.serverlessworkflow.api.types.SchemaInline;
2120
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.impl.events.EventConsumer;
@@ -62,7 +61,7 @@ public class WorkflowApplication implements AutoCloseable {
6261
private final Collection<EventPublisher> eventPublishers;
6362
private final boolean lifeCycleCEPublishingEnabled;
6463

65-
private WorkflowApplication(Builder builder) {
64+
protected WorkflowApplication(Builder builder) {
6665
this.taskFactory = builder.taskFactory;
6766
this.exprFactory = builder.exprFactory;
6867
this.resourceLoaderFactory = builder.resourceLoaderFactory;
@@ -150,7 +149,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
150149
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
151150
private boolean lifeCycleCEPublishingEnabled = true;
152151

153-
private Builder() {}
152+
protected Builder() {}
154153

155154
public Builder withListener(WorkflowExecutionListener listener) {
156155
listeners.add(listener);
@@ -249,15 +248,12 @@ public WorkflowApplication build() {
249248
}
250249
}
251250

252-
private static record WorkflowId(String namespace, String name, String version) {
253-
static WorkflowId of(Document document) {
254-
return new WorkflowId(document.getNamespace(), document.getName(), document.getVersion());
255-
}
251+
public WorkflowDefinition workflowDefinition(Workflow workflow) {
252+
return definitions.computeIfAbsent(WorkflowId.of(workflow), k -> createDefinition(workflow));
256253
}
257254

258-
public WorkflowDefinition workflowDefinition(Workflow workflow) {
259-
return definitions.computeIfAbsent(
260-
WorkflowId.of(workflow.getDocument()), k -> WorkflowDefinition.of(this, workflow));
255+
protected WorkflowDefinition createDefinition(Workflow workflow) {
256+
return WorkflowDefinition.of(this, workflow);
261257
}
262258

263259
@Override

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3636
private Optional<WorkflowFilter> outputFilter = Optional.empty();
3737
private final WorkflowApplication application;
3838
private final TaskExecutor<?> taskExecutor;
39+
private ResourceLoader resourceLoader;
3940

4041
private WorkflowDefinition(
4142
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
4243
this.workflow = workflow;
4344
this.application = application;
45+
this.resourceLoader = resourceLoader;
4446
if (workflow.getInput() != null) {
4547
Input input = workflow.getInput();
4648
this.inputSchemaValidator =
@@ -55,11 +57,7 @@ private WorkflowDefinition(
5557
}
5658
this.taskExecutor =
5759
TaskExecutorHelper.createExecutorList(
58-
application.positionFactory().get(),
59-
workflow.getDo(),
60-
workflow,
61-
application,
62-
resourceLoader);
60+
application.positionFactory().get(), workflow.getDo(), this);
6361
}
6462

6563
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -72,7 +70,10 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
7270
}
7371

7472
public WorkflowInstance instance(Object input) {
75-
return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input));
73+
WorkflowModel inputModel = application.modelFactory().fromAny(input);
74+
inputSchemaValidator().ifPresent(v -> v.validate(inputModel));
75+
return new WorkflowMutableInstance(
76+
this, application().idFactory().get(), inputModel, WorkflowStatus.PENDING);
7677
}
7778

7879
Optional<SchemaValidator> inputSchemaValidator() {
@@ -106,7 +107,9 @@ public WorkflowApplication application() {
106107
}
107108

108109
@Override
109-
public void close() {
110-
// TODO close resourcers hold for uncompleted process instances, if any
110+
public void close() {}
111+
112+
public ResourceLoader resourceLoader() {
113+
return resourceLoader;
111114
}
112115
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.serverlessworkflow.api.types.Document;
19+
import io.serverlessworkflow.api.types.Workflow;
20+
21+
public record WorkflowId(String namespace, String name, String version) {
22+
23+
public static WorkflowId of(Workflow workflow) {
24+
Document document = workflow.getDocument();
25+
return new WorkflowId(document.getNamespace(), document.getName(), document.getVersion());
26+
}
27+
28+
public String identifier() {
29+
return identifier(namespace, name, version);
30+
}
31+
32+
public static String asString(Workflow workflow) {
33+
Document document = workflow.getDocument();
34+
return identifier(document.getNamespace(), document.getName(), document.getVersion());
35+
}
36+
37+
private static String identifier(String namespace, String name, String version) {
38+
return namespace + "-" + name + "-" + version;
39+
}
40+
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,18 @@ public class WorkflowMutableInstance implements WorkflowInstance {
5151
private TaskContext suspendedTask;
5252
private CompletableFuture<TaskContext> cancelled;
5353

54-
WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) {
55-
this.id = definition.application().idFactory().get();
54+
public WorkflowMutableInstance(
55+
WorkflowDefinition definition, String id, WorkflowModel input, WorkflowStatus status) {
56+
this.id = id;
5657
this.input = input;
57-
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
58-
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
58+
this.status = new AtomicReference<>(status);
5959
this.workflowContext = new WorkflowContext(definition, this);
6060
}
6161

6262
@Override
6363
public CompletableFuture<WorkflowModel> start() {
64-
this.startedAt = Instant.now();
65-
this.status.set(WorkflowStatus.RUNNING);
64+
startedAt = Instant.now();
65+
status.set(WorkflowStatus.RUNNING);
6666
publishEvent(
6767
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
6868
this.completableFuture =
@@ -107,7 +107,7 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
107107
.map(f -> f.apply(workflowContext, null, node))
108108
.orElse(node);
109109
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
110-
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
110+
status.set(WorkflowStatus.COMPLETED);
111111
publishEvent(
112112
workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext)));
113113
return output;
@@ -145,12 +145,14 @@ public WorkflowModel output() {
145145

146146
@Override
147147
public <T> T outputAs(Class<T> clazz) {
148-
return output
149-
.as(clazz)
150-
.orElseThrow(
151-
() ->
152-
new IllegalArgumentException(
153-
"Output " + output + " cannot be converted to class " + clazz));
148+
return output != null
149+
? output
150+
.as(clazz)
151+
.orElseThrow(
152+
() ->
153+
new IllegalArgumentException(
154+
"Output " + output + " cannot be converted to class " + clazz))
155+
: null;
154156
}
155157

156158
public void status(WorkflowStatus state) {
@@ -215,15 +217,9 @@ public boolean resume() {
215217
}
216218
}
217219

218-
public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
220+
public CompletableFuture<TaskContext> cancelCheck(TaskContext t) {
219221
try {
220222
statusLock.lock();
221-
if (suspended != null) {
222-
suspendedTask = t;
223-
publishEvent(
224-
workflowContext, l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, t)));
225-
return suspended;
226-
}
227223
if (cancelled != null) {
228224
cancelled = new CompletableFuture<TaskContext>();
229225
workflowContext.instance().status(WorkflowStatus.CANCELLED);
@@ -237,6 +233,29 @@ public CompletableFuture<TaskContext> completedChecks(TaskContext t) {
237233
return CompletableFuture.completedFuture(t);
238234
}
239235

236+
public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {
237+
try {
238+
statusLock.lock();
239+
if (suspended != null) {
240+
this.suspendedTask = t;
241+
publishEvent(
242+
workflowContext,
243+
l -> l.onTaskSuspended(new TaskSuspendedEvent(workflowContext, suspendedTask)));
244+
return suspended;
245+
}
246+
} finally {
247+
statusLock.unlock();
248+
}
249+
return CompletableFuture.completedFuture(t);
250+
}
251+
252+
// internal purposes only, not to be invoked directly by users of the API
253+
public void restore(
254+
WorkflowPosition position, WorkflowModel model, WorkflowModel context, Instant startedAt) {
255+
this.startedAt = startedAt;
256+
workflowContext.context(context);
257+
}
258+
240259
@Override
241260
public boolean cancel() {
242261
try {

0 commit comments

Comments
 (0)