Reputation: 103
I'm having trouble connecting the webflux webclient to my endpoint.
In my controller, I'm trying to expose a hot stream of Chocolate DTO's. When I try to call the endpoint with postman, it works.
This sentence exists because Stackoverflow wants me to add more text, because there is too much code in my question, sorry for the inconvience.
@RestController
public class HotChocolateController {
@GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Chocolate> chocolateStream() {
return Flux.interval(Duration.ofMillis(500))
.map(l -> Chocolate.builder()
.id(String.valueOf(System.currentTimeMillis()))
.name("Hot Chocolate")
.build())
.log();
}
}
Then I'm trying to test my setup with an unit test:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {
private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
@Test
public void testHotChocolate(){
Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
chocolateSource.subscribe(chocolate -> System.out.println());
}
When I run this test, I get the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$3(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop$7(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Do you have any idea why this may happen?
Upvotes: 5
Views: 21281
Reputation: 2312
You are just subscribing for the stream and immediately exiting the test method. You can change the method to something like the code below and you'll be able to see the result of the get request:
@Test
public void testHotChocolate(){
Flux<String> chocolateSource =
client.get()
.uri("/stream/chocolate")
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println);
chocolateSource.blockLast();
}
Upvotes: 4