Reputation: 1766
I agree that reactive code should not block, but...
In the following method, findTenantStorageFactory(), I need to block on the tenantId, so the method can return a specific Factory for a tenant. I found instructions from Project Reactor that I need to wrap the method in a Mono.fromCallable() and use the boundedElastic() scheduler. However, when I run this code in the debugger, I receive the dreaded: "block()/blockFirst()/blockLast() are blocking, which is not supported" error. I've tried a million ideas, but cannot make any progress. I'd appreciate your suggestions.
//////////////////////////////////////////////////////////////////////////
fun findTenantStorageFactory(tenantId: Mono<TenantId>):
Mono<Either<BaseAzureBlobStorageException, MultitenantAzureBlobStorageFactory>> {
val myValue: Callable<Either<
BaseAzureBlobStorageException,
MultitenantAzureBlobStorageFactory>> =
Callable { lookupTenantBlobStorageFactory(tenantId.block()!!, factories) }
return Mono.fromCallable<Either<
BaseAzureBlobStorageException,
MultitenantAzureBlobStorageFactory>>(myValue)
.subscribeOn(Schedulers.boundedElastic())
}
//////////////////////////////////////////////////////////////////////////
private val blobStorageClientBuilder: AzureBlobClientBuilder
get() =
findTenantStorageFactory(tenantId).block()!!
.fold({ throw it }, { it.blobStorageClient })!!
/////////////////////////////// ERROR ///////////////////////////////////
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0-M1.jar:3.4.0-M1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Upvotes: 1
Views: 1251
Reputation: 3946
Why do you think that you need to block on tenantId
? I would use map
to convert a Mono to a Mono. If lookupTenantBlobStorageFactory
is a long-running method and you want to avoid blocking the current thread, publishOn
should help you. Something like this:
fun findTenantStorageFactory(tenantId: Mono<TenantId>):
Mono<Either<BaseAzureBlobStorageException, MultitenantAzureBlobStorageFactory>> {
return tenantId.publishOn(Schedulers.boundedElastic())
.map({ id -> lookupTenantBlobStorageFactory(id, factories) })
}
Upvotes: 1