Reputation: 462
Below is code I have for a component that starts a Flux
and subscribes to it, all within the constructor of the class. This particular flux comes from a mongoChangeStreams call. It does not terminate unless there is an error.
I want the subscription to stay alive constantly so I restart the subscription in the event in terminates due to an error.
It has occurred to me that calling subscribe within a constructor might be a bad idea. Also I should probably enable a way to shut down this app gracefully by calling cancel on the subscription during shutdown.
My guess is that I should be implementing SmartLifeCycle
but I'm not sure how to do that. Is there a standard way of implementing SmartLifeCycle on a component backed by a Flux subscription?
@Component
class SubscriptionManager(
private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
private val fooProcessor: FooProcessor
) {
private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor
private fun subscribe() = buildSubscriber().also {
fooFluxProvider.foos().subscribe(it)
}
private fun buildSubscriber(): BaseSubscriber<Foo> {
return object : BaseSubscriber<Foo>() {
override fun hookOnSubscribe(subscription: Subscription) {
subscription.request(1)
}
override fun hookOnNext(value: Foo) {
//process the foo
fooProcessor.process(value)//sync call
//ask for another foo
request(1)
}
override fun hookOnError(throwable: Throwable) {
logger.error("Something went wrong, restarting subscription", throwable)
//restart the subscription. We'll recover if we're lucky
subscription = subscribe()
}
}
}
}
Upvotes: 0
Views: 677
Reputation: 4554
retry*
operators on the Flux before subscribing. The retry operators will resubscribe to the upstream Flux if it completes with an exception. For example, fooFluxProvider.foos().retry()
will retry indefinitely. There are other variations of retry*
for more advanced behavior, including an extremely customizable retryWhen
that can be used with the reactor.retry.Retry
class from reactor-extra
.subscribe(subscriber)
, call one of the subscribe
methods that returns a Disposable
. This gives you an object on which you can call dispose()
later during shutdown to cancel the subscription.SmartLifecycle
:
start()
), create the Flux
(but do not subscribe to it in the constructor)start()
, call flux.subscribe()
and save the returned Disposable
to a member field. The start()
method is much better suited for starting background jobs than a constructor. Consider also chaining .subscribeOn(Scheduler)
before .subscribe()
if you want this to run in the background (by default, the subscription occurs on the thread on which subscribe
was called). stop()
, call disposable.dispose()
Perhaps something like this:
class SubscriptionManager(
fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
fooProcessor: FooProcessor
) : SmartLifecycle {
private val logger = LoggerFactory.getLogger(javaClass)
private val fooFlux = fooFluxProvider.foos()
// Subscribe on a parallel scheduler to run in the background
.subscribeOn(Schedulers.parallel())
// Publish on a boundedElastic scheduler if fooProcessor.process blocks
.publishOn(Schedulers.boundedElastic())
// Use .doOnNext to send the foo to your processor
// Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
// Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
.doOnNext(fooProcessor::process)
// Log if an exception occurred
.doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
// Resubscribe if an exception occurred
.retry()
// Repeat if you want to resubscribe if the upstream flux ever completes successfully
.repeat()
private var disposable: Disposable? = null
@Synchronized
override fun start() {
if (!isRunning) {
disposable = fooFlux.subscribe()
}
}
@Synchronized
override fun stop() {
disposable?.dispose()
disposable = null
}
@Synchronized
override fun isRunning(): Boolean {
return disposable != null
}
}
Upvotes: 0