Reputation: 416
I have three questions related to Project Reactor and I will ask them below. Start with the code that I have (it will be simplified to easier understand the issue).
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
And test:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
So and questions:
fun1
and immediately return error? (maybe I'm doing something wrong but it looks error returns not after timeout but after all invocation of callback)doOnSuccess
and doOnCancel
invoked at the same time? (I expected that (1) OR (2) will be invoked but not the both)Mono.just("hello")
is acquiring connection;callback
I'm doing something with connection and getting some result (Mono<Integer>
in my case);Upvotes: 13
Views: 24634
Reputation: 416
For the 1st question looks like the answer is to use schedulers:
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.compose(monostr -> monostr
.publishOn(single) // use scheduler
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
3rd question could be solved this way:
private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) {
Scheduler single = Schedulers.single();
return Mono.just("hello")
.then(str -> Mono.just(str) // here wrapping our string to new Mono
.publishOn(single)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
.doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str))
);
}
Upvotes: 0
Reputation: 28301
1) As you found out, use .publishOn(Schedulers.single())
. This will ensure the callable is invoked on another thread and only blocks said thread. Plus it will allow the callable to be cancelled.
2) The order of your chain matters. You put .doOnSuccess
at the beginning of the compose
(which you don't really need for that particular example by the way, unless you want to extract that compose function for reuse later). So it means that it gets notifications from the Mono.just
basically, and runs as soon as the source is queried, even before your processing has taken place... Same for doOnCancel
. The cancellation comes from the timeout
triggering...
3) There's an factory to create a sequence out of a resource and ensure that resource is cleaned up: Mono.using
. So it would look something like that:
public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
return Mono.using(
//the resource supplier:
() -> {
System.out.println("connection acquired");
return "hello";
},
//create a Mono out of the resource. On any termination, the resource is cleaned up
connection -> Mono.just(connection)
//the blocking callable needs own thread:
.publishOn(Schedulers.single())
//execute the callable and get result...
.then(callback::apply)
//...but cancel if it takes too long
.timeoutMillis(timeout)
//for demonstration we'll log when timeout triggers:
.doOnError(TimeoutException.class, e -> System.out.println("timed out")),
//the resource cleanup:
connection -> System.out.println("cleaned up " + connection));
}
That returns a Mono<T>
of the callable's T value. In production code, you'd subscribe to it to deal with the value. In test, StepVerifier.create()
will subscribe for you.
Let's demonstrate that with your long running task and see what it outputs:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("start some long timed work");
//for demonstration we'll print some clock ticks
for (int i = 1; i <= 5; i++) {
try {
Thread.sleep(1000);
System.out.println(i + "s...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("work has completed");
return str.length();
});
//let two ticks show up
StepVerifier.create(doWithConnection(fun1,2100))
.verifyError(TimeoutException.class);
}
This outputs:
connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello
And if we put the timeout over 5000, we get the following. (there' an assertion error because the StepVerifier expects a timeout):
connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello
java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)
Upvotes: 13