Gregor
Gregor

Reputation: 3027

Receiving a Flux in a Spring Boot Client

This is a follow-up question to Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

I tried to follow the recommendation how to receive a Flux using the Spring WebClient but actually get a netty issue.

On the server side the code is a simple controller exposing the findAll method of a Mongo repository:

@RequestMapping("power")
public Flux<Power> getAll() {
    return powerRepository.findAll();
}

On the client the consumption code is like in the answer given above:

@Bean
public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        @Override
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            flux.subscribe();
        }
    };
}

But this throws an exception:

2017-02-27 08:19:41.281 ERROR 99026 --- [ctor-http-nio-5] r.ipc.netty.channel.ChannelOperations : [HttpClient] Error processing connection. Requesting close the channel

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101) ~[netty-all-4.1.8.Final.jar:4.1.8.Final]

I am using the current Spring Boot 2.0.0 BUILD-SNAPSHOT.

What is this exception telling me? How can I get it right?

Upvotes: 0

Views: 4251

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59141

All CommandLineRunner beans are executed as the Spring Boot application starts up. If there's no daemon thread (i.e. the current application is not a web app), the application will shut down once all runners have been executed.

In your case, using flux.subscribe() only "Start the chain and request unbounded demand" (javadoc), so this method call is not blocking. I suspect this command line runner is returning before you get a chance to do anything with your flux, the application shuts down and your network resources are closed.

Also, you're not doing anything with the result of your HTTP request. I think updating your code sample with the following should solve the issue:

@Bean
public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        @Override
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            List<Power> powerList = flux.collectList().block();
            // do something with the result?
        }
    };
}

Upvotes: 3

Related Questions