Reputation: 2268
I have two different sources of some IDs I have to do work with. One is from a file, another is from URL. When I'm creating Flux
from the Files' lines, I can perfectly well work on it. When I'm switching the Flux-creating function with the one that uses WebClient....get(), I get different results; the WebClient does never get called for some reason.
private Flux<String> retrieveIdListFromFile(String filename) {
try {
return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
} catch (IOException e) {
return Flux.error(e);
}
}
Here the WebClient part...
private Flux<String> retrieveIdList() {
return client.get()
.uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
.queryParam("q", "-P_Id:[* TO *]")
.queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
.queryParam("fl", "id")
.queryParam("rows", "10")
.queryParam("wt", "csv")
.build())
.retrieve()
.bodyToFlux(String.class);
}
When I do a subscribe(System.out::println)
on the WebClient's flux, nothing happens. When I do a blockLast(), it works (URL is called, data returned). I don't get why, and how to correct this, and what I'm doing wrong.
With the flux that originates from the file, even the subscribe works fine. I sort of thought, that Fluxes are interchangable...
When I do a retrieveIdList().log().subscribe()
:
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
When I do the same with blockLast() instead of subscribe():
INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.
Upvotes: 0
Views: 563
Reputation: 59086
Judging from your question update, it seems that nothing is waiting on the processing to finish. I assume this is a batch or CLI application, and not a web application?
Assuming the following:
Flux<User> users = userService.fetchAll();
Calling blockLast
on a Flux
will trigger the processing and block
until the result is there.
Calling subscribe
on it will trigger the processing asynchronously; we're seeing the subscriber request
elements in your logs, but nothing more. This probably means that the JVM exits before any elements are published - nothing is waiting on the result.
If you're effectively writing some CLI/batch application and not processing requests within a web application, you can block
on the final reactive pipeline to get the result. If you wish to write that result to a file or send it to a different service, then you should compose on it with reactor operators.
Upvotes: 1