Reputation: 167
I'm writing a simple orchestration framework using reactor framework which executes tasks sequentially, and the next task to execute is dependent on the result from previous tasks. I might have multiple paths to choose from based on the outcome of previous tasks. Earlier, I wrote a similar framework based on a static DAG where I passed as list of tasks as iterables and used Flux.fromIterable(taskList)
. However, this does not give me the flexibility to go dynamic because of the static array publisher.
I'm looking for alternate approaches like do(){}while(condition)
to solve for DAG traversal and task decision and I came up with Flux.generate()
. I evaluate the next step in generate method and pass the next task downstream. The problem I'm facing now is, Flux.generate
does not wait for downstream to complete, but pushes until the condition is set to invalid. And by the time task 1 gets executed, task 2 would have been pushed n
times, which is not the expected behavior.
Can someone please point me towards the right direction?
Thanks.
First iteration using List of tasks (static DAG)
Flux.fromIterable(taskList)
.publishOn(this.factory.getSharedSchedulerPool())
.concatMap(
reactiveTask -> {
log.info("Running task =>{}", reactiveTask.getTaskName());
return reactiveTask
.run(ctx);
})
// Evaluates status from previous task and terminates stream or continues.
.takeWhile(context -> evaluateStatus(context))
.onErrorResume(throwable -> buildResponse(ctx, throwable))
.doOnCancel(() -> log.info("Task cancelled"))
.doOnComplete(() -> log.info("Completed flow"))
.subscribe();
Attempt to dynamic dag
Flux.generate(
(SynchronousSink<ReactiveTask<OrchestrationContext>> synchronousSink) -> {
ReactiveTask<OrchestrationContext> task = null;
if (ctx.getLastExecutedStep() == null) {
// first task;
task = getFirstTaskFromDAG();
} else {
task = deriveNextStep(ctx.getLastExecutedStep(), ctx.getDecisionData());
}
if (task.getName.equals("END")) {
synchronousSink.complete();
}
synchronousSink.next(task);
})
.publishOn(this.factory.getSharedSchedulerPool())
.doOnNext(orchestrationContextReactiveTask -> log.info("On next => {}",
orchestrationContextReactiveTask.getTaskName()))
.concatMap(
reactiveTask -> {
log.info("Running task =>{}", reactiveTask.getTaskName());
return reactiveTask
.run(ctx);
})
.onErrorResume(throwable -> buildResponse(ctx, throwable))
.takeUntil(context -> evaluateStatus(context, tasks))
.doOnCancel(() -> log.info("Task cancelled"))
.doOnComplete(() -> log.info("Completed flow")).subscribe();
The problem in above approach is, while task 1 is executing, the onNext()
subscriber prints many time because generate
is publishing. I want the generate method to wait on results from previous task and submit new task. In non-reactive world, this can be achieve through simple while() loop.
Each Task will perform the following action.
public class ResponseTask extends AbstractBaseTask {
private TaskDefinition taskDefinition;
final String taskName;
public ResponseTask(
StateManager stateManager,
ThreadFactory factory,
) {
this.taskDefinition = taskDefinition;
this.taskName = taskName;
}
public Mono<String> transform(OrchestrationContext context) {
Any masterPayload = Any.wrap(context.getIngestionPayload());
return Mono.fromCallable(() -> stateManager.doTransformation(context, masterPayload);
}
public Mono<OrchestrationContext> execute(OrchestrationContext context, String payload) {
log.info("Executing sleep for task=>{}", context.getLastExecutedStep());
return Mono.delay(Duration.ofSeconds(1), factory.getSharedSchedulerPool())
.then(Mono.just(context));
}
public Mono<OrchestrationContext> run(OrchestrationContext context) {
log.info("Executing task:{}. Last executed:{}", taskName, context.getLastExecutedStep());
return transform(context)
.doOnNext((result) -> log.info("Transformation complete for task=?{}", taskName);)
.flatMap(payload -> {
return execute(context, payload);
}).onErrorResume(throwable -> {
context.setStatus(FAILED);
return Mono.just(context);
});
}
}
EDIT - From @Ikatiforis 's recommendation - I got the following output
Here's the output from my side.
2021-12-02 09:58:14,643 INFO (reactive_shared_pool) [ReactiveEngine lambda$doOrchestration$5:98] On next => Task1
2021-12-02 09:58:14,644 INFO (reactive_shared_pool) [ReactiveEngine lambda$doOrchestration$6:101] Running task =>Task1
2021-12-02 09:58:14,644 INFO (reactive_shared_pool) [AbstractBaseTask run:75] Executing task:Task1. Last executed:Task1
2021-12-02 09:58:14,658 INFO (reactive_shared_pool) [ReactiveEngine lambda$doOrchestration$5:98] On next => Task2
2021-12-02 09:58:14,659 INFO (reactive_shared_pool) [AbstractBaseTask lambda$run$0:83] Transformation complete for task=?Task1
2021-12-02 09:58:14,659 INFO (reactive_shared_pool) [ResponseTask execute:41] Executing sleep for task=>Task1
2021-12-02 09:58:15,661 INFO (reactive_shared_pool) [AbstractBaseTask lambda$run$4:106] Success for task=>Task1
2021-12-02 09:58:15,663 INFO (reactive_shared_pool)
[ReactiveEngine lambda$doOrchestration$6:101] Running task =>Task2
2021-12-02 09:58:15,811 INFO (cassandra-nio-worker-8) [AbstractBaseTask run:75] Executing task:Task2. Last executed:Task2
2021-12-02 09:58:15,811 INFO (reactive_shared_pool) [ReactiveEngine lambda$doOrchestration$5:98] On next => Task2
2021-12-02 09:58:15,812 INFO (reactive_shared_pool) [AbstractBaseTask lambda$run$0:83] Transformation complete for task=?Task2
2021-12-02 09:58:15,812 INFO (reactive_shared_pool) [ResponseTask execute:41] Executing sleep for task=>Task2
2021-12-02 09:58:15,837 INFO (centaurus_reactive_shared_pool) [ReactiveEngine lambda$doOrchestration$9:113] Completed flow
I see couple of problems here --
The sequence of execution is
1. Task does transformations ( runs on Mono.fromCallable)
2. Task induces a delay - Mono.fromDelay()
3. Task completes execution. After this, generate method should evaluate the context and pass on the next task to be executed.
What I see from the output is:
1. Task 1 starts the transformations - Runs on Mono.fromCallable.
2. Task 2 doOnNext is reported - which means the stream already got this task.
3. Task 1 completes.
4. Task 2 starts and executes delay -> the stream does not wait for response from task 2 but completes the flow.
Upvotes: 0
Views: 1985
Reputation: 6255
The problem in above approach is, while task 1 is executing, the onNext() subscriber prints many time because generate is publishing.
This is happening because concatMap
requests a number of items upfront(32 by default) instead of requesting elements one by one. If you really need to request one element at the time you can use concatMap(Function<? super T,? extends Publisher<? extends V>> mapper,int prefetch)
variant method and provide the prefetch
value like this:
.concatMap(reactiveTask -> {
log.info("Running task =>{}", reactiveTask.getTaskName());
return reactiveTask.run(ctx);
}, 1)
Edit
There is also a publishOn
method which takes a prefetch
value. Take a look at the following Fibonacci generator sample and let me know if it works as you expect:
generateFibonacci(100)
.publishOn(boundedElasticScheduler, 1)
.doOnNext(number -> log.info("On next => {}", number))
.concatMap(number -> {
log.info("Running task => {}", number);
return task(number).doOnNext(num -> log.info("Task completed => {}", num));
}, 1)
.takeWhile(context -> context < 3)
.subscribe();
public Flux<Integer> generateFibonacci(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
log.info("Generating number: " + state);
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}
Here is the output:
2021-12-02 10:47:51,990 INFO main c.u.p.p.s.c.Test - Generating number: FibonacciState(former=0, latter=1)
2021-12-02 10:47:51,993 INFO pool-1-thread-1 c.u.p.p.s.c.Test - On next => 0
2021-12-02 10:47:51,996 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Running task => 0
2021-12-02 10:47:54,035 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Task completed => 0
2021-12-02 10:47:54,035 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Generating number: FibonacciState(former=1, latter=1)
2021-12-02 10:47:54,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - On next => 1
2021-12-02 10:47:54,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Running task => 1
2021-12-02 10:47:56,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Task completed => 1
2021-12-02 10:47:56,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Generating number: FibonacciState(former=1, latter=2)
2021-12-02 10:47:56,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - On next => 1
2021-12-02 10:47:56,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Running task => 1
2021-12-02 10:47:58,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Task completed => 1
2021-12-02 10:47:58,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Generating number: FibonacciState(former=2, latter=3)
2021-12-02 10:47:58,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - On next => 2
2021-12-02 10:47:58,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Running task => 2
2021-12-02 10:48:00,036 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Task completed => 2
2021-12-02 10:48:00,037 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Generating number: FibonacciState(former=3, latter=5)
2021-12-02 10:48:00,037 INFO pool-1-thread-1 c.u.p.p.s.c.Test - On next => 3
2021-12-02 10:48:00,037 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Running task => 3
2021-12-02 10:48:02,037 INFO pool-1-thread-1 c.u.p.p.s.c.Test - Task completed => 3
2021-12-02 10:52:07,877 INFO pool-1-thread-2 c.u.p.p.s.c.Test - Completed flow
Edit 04122021
You stated:
I'm trying to simulate HTTP / blocking calls. Hence the Mono.delay.
Mono#Delay
is not the appropriate method to simulate a blocking call. The delay is introduced through the parallel scheduler and as a result, it does not wait for the task to complete. You can simulate a blocking call like this:
public String get() throws IOException {
HttpsURLConnection connection = (HttpsURLConnection) new URL("https://jsonplaceholder.typicode.com/comments").openConnection();
connection.setRequestMethod("GET");
try(InputStream inputStream = connection.getInputStream()) {
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
}
Note that as an alternative you could use .limitRate(1)
operator instead of the prefetch
parameter.
Upvotes: 1