bigo1
bigo1

Reputation: 53

Blocking operations in Reactive application using MongoDB reactive drivers

I am developing a REST API which is reactive, using Reactive Mongo drivers, Spring Webflux framework.

All works fine when the concurrency level is below 100, however when I increase the concurrency level to 200, blockhound reports blocking calls to Mongo.

I am using Mongod community version on windows, with Blockhound installed in Spring boot application and Apache bench to test the concurrent behaviour.

Below is the error. This error is reported only when concurrency above 100.

Any suggestions on how to resolve this error?

Please note I have reported this issue with Mongodb community as well.

They have informed that it may be a false positive as system may not have enough resources to create more connections. If there are no resources to create more connections, should Blockhound report as blocking?

2021-01-02 09:52:17.399 ERROR 18688 --- [reactor-http-nio-2] org.mongodb.driver.operation             
: Callback onResult call produced an error.

reactor.blockhound.BlockingOperationError: Blocking call! jdk.internal.misc.Unsafe#park
    at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java)
    at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
    at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
    at java.base/java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:409)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1347)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:714)
    at com.mongodb.internal.connection.DefaultConnectionPool.getAsync(DefaultConnectionPool.java:157)
    at com.mongodb.internal.connection.DefaultServer.getConnectionAsync(DefaultServer.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$AsyncClusterBindingConnectionSource.getConnection(AsyncClusterBinding.java:131)
    at com.mongodb.internal.async.client.ClientSessionBinding$SessionBindingAsyncConnectionSource.getConnection(ClientSessionBinding.java:140)
    at com.mongodb.internal.operation.OperationHelper.withAsyncConnectionSource(OperationHelper.java:730)
    at com.mongodb.internal.operation.OperationHelper.access$200(OperationHelper.java:68)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:750)
    at com.mongodb.internal.operation.OperationHelper$AsyncCallableWithConnectionAndSourceCallback.onResult(OperationHelper.java:738)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:208)
    at com.mongodb.internal.async.client.ClientSessionBinding$WrappingCallback.onResult(ClientSessionBinding.java:196)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:105)
    at com.mongodb.internal.binding.AsyncClusterBinding$1.onResult(AsyncClusterBinding.java:99)
    at com.mongodb.internal.connection.BaseCluster$ServerSelectionRequest.onResult(BaseCluster.java:432)
    at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:299)
    at com.mongodb.internal.connection.BaseCluster.selectServerAsync(BaseCluster.java:155)
    at com.mongodb.internal.connection.SingleServerCluster.selectServerAsync(SingleServerCluster.java:42)
    at com.mongodb.internal.binding.AsyncClusterBinding.getAsyncClusterBindingConnectionSource(AsyncClusterBinding.java:99)
    at com.mongodb.internal.binding.AsyncClusterBinding.getReadConnectionSource(AsyncClusterBinding.java:84)
    at com.mongodb.internal.async.client.ClientSessionBinding.getReadConnectionSource(ClientSessionBinding.java:58)
    at com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection(OperationHelper.java:677)
    at com.mongodb.internal.operation.FindOperation.executeAsync(FindOperation.java:689)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:86)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1$1.onResult(OperationExecutorImpl.java:74)
    at com.mongodb.internal.async.client.OperationExecutorImpl.getReadWriteBinding(OperationExecutorImpl.java:177)
    at com.mongodb.internal.async.client.OperationExecutorImpl.access$200(OperationExecutorImpl.java:43)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:72)
    at com.mongodb.internal.async.client.OperationExecutorImpl$1.onResult(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.ClientSessionHelper.createClientSession(ClientSessionHelper.java:60)
    at com.mongodb.internal.async.client.ClientSessionHelper.withClientSession(ClientSessionHelper.java:51)
    at com.mongodb.internal.async.client.OperationExecutorImpl.execute(OperationExecutorImpl.java:66)
    at com.mongodb.internal.async.client.AsyncMongoIterableImpl.batchCursor(AsyncMongoIterableImpl.java:167)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.requestInitialData(MongoIterableSubscription.java:45)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.tryRequestInitialData(AbstractSubscription.java:177)
    at com.mongodb.reactivestreams.client.internal.AbstractSubscription.request(AbstractSubscription.java:100)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235)
    at com.mongodb.reactivestreams.client.internal.MongoIterableSubscription.<init>(MongoIterableSubscription.java:39)
    at com.mongodb.reactivestreams.client.internal.Publishers.lambda$publish$0(Publishers.java:43)
    at com.mongodb.reactivestreams.client.internal.FindPublisherImpl.subscribe(FindPublisherImpl.java:175)
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:66)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
    at reactor.core.publisher.Operators.error(Operators.java:196)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
    at reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8133)
    at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:93)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:383)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:540)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:252)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

Upvotes: 3

Views: 1280

Answers (1)

bbuck
bbuck

Reputation: 127

There is a common exception that mongo may kick out when its own query request queue gets too large. This happens because webflux is efficient at passing through requests and the mongo driver chokes, on the database side. The back pressure isn't handled across the connection.

Not sure without seeing code but that could be one thing to investigate.

You may also be making a simple mistake in code and actually calling blocking code in your web handlers.

Upvotes: 2

Related Questions