M.Walk
M.Walk

Reputation: 103

Webflux WebClient java.lang.NullPointerException

I'm having trouble connecting the webflux webclient to my endpoint.

In my controller, I'm trying to expose a hot stream of Chocolate DTO's. When I try to call the endpoint with postman, it works.

This sentence exists because Stackoverflow wants me to add more text, because there is too much code in my question, sorry for the inconvience.

@RestController
public class HotChocolateController {

   @GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
   public Flux<Chocolate> chocolateStream() {
       return Flux.interval(Duration.ofMillis(500))
               .map(l -> Chocolate.builder()
                    .id(String.valueOf(System.currentTimeMillis()))
                       .name("Hot Chocolate")
                       .build())
               .log();
   }
}

Then I'm trying to test my setup with an unit test:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {

    private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();


    @Test
    public void testHotChocolate(){
        Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
        chocolateSource.subscribe(chocolate -> System.out.println());
    }

When I run this test, I get the following exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
    at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
        at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$3(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Do you have any idea why this may happen?

Upvotes: 5

Views: 21281

Answers (1)

Violeta Georgieva
Violeta Georgieva

Reputation: 2312

You are just subscribing for the stream and immediately exiting the test method. You can change the method to something like the code below and you'll be able to see the result of the get request:

    @Test
    public void testHotChocolate(){
        Flux<String> chocolateSource =
                client.get()
                        .uri("/stream/chocolate")
                        .retrieve()
                        .bodyToFlux(String.class)
                        .doOnNext(System.out::println);
        chocolateSource.blockLast();
    }

Upvotes: 4

Related Questions