Frischling
Frischling

Reputation: 2268

Flux from WebClient behaves differently than Flux from File.readLines

I have two different sources of some IDs I have to do work with. One is from a file, another is from URL. When I'm creating Flux from the Files' lines, I can perfectly well work on it. When I'm switching the Flux-creating function with the one that uses WebClient....get(), I get different results; the WebClient does never get called for some reason.

private Flux<String> retrieveIdListFromFile(String filename) {
  try {
    return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
  } catch (IOException e) {
    return Flux.error(e);
  }
}

Here the WebClient part...

private Flux<String> retrieveIdList() {
  return client.get()
      .uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
          .queryParam("q", "-P_Id:[* TO *]")
          .queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
          .queryParam("fl", "id")
          .queryParam("rows", "10")
          .queryParam("wt", "csv")
          .build())
      .retrieve()
      .bodyToFlux(String.class);
}

When I do a subscribe(System.out::println)on the WebClient's flux, nothing happens. When I do a blockLast(), it works (URL is called, data returned). I don't get why, and how to correct this, and what I'm doing wrong. With the flux that originates from the file, even the subscribe works fine. I sort of thought, that Fluxes are interchangable...

When I do a retrieveIdList().log().subscribe():

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)

When I do the same with blockLast() instead of subscribe():

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.

Upvotes: 0

Views: 563

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59086

Judging from your question update, it seems that nothing is waiting on the processing to finish. I assume this is a batch or CLI application, and not a web application?

Assuming the following:

Flux<User> users = userService.fetchAll();

Calling blockLast on a Flux will trigger the processing and block until the result is there.

Calling subscribe on it will trigger the processing asynchronously; we're seeing the subscriber request elements in your logs, but nothing more. This probably means that the JVM exits before any elements are published - nothing is waiting on the result.

If you're effectively writing some CLI/batch application and not processing requests within a web application, you can block on the final reactive pipeline to get the result. If you wish to write that result to a file or send it to a different service, then you should compose on it with reactor operators.

Upvotes: 1

Related Questions