Jacob Botuck
Jacob Botuck

Reputation: 462

Implementing smartLifeCycle with a reactor subscription

Below is code I have for a component that starts a Flux and subscribes to it, all within the constructor of the class. This particular flux comes from a mongoChangeStreams call. It does not terminate unless there is an error.

I want the subscription to stay alive constantly so I restart the subscription in the event in terminates due to an error.

It has occurred to me that calling subscribe within a constructor might be a bad idea. Also I should probably enable a way to shut down this app gracefully by calling cancel on the subscription during shutdown.

My guess is that I should be implementing SmartLifeCycle but I'm not sure how to do that. Is there a standard way of implementing SmartLifeCycle on a component backed by a Flux subscription?

@Component
class SubscriptionManager(
    private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
    private val fooProcessor: FooProcessor
)  {

    private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor

    private fun subscribe() = buildSubscriber().also {
        fooFluxProvider.foos().subscribe(it)
    }

    private fun buildSubscriber(): BaseSubscriber<Foo> {
        return object : BaseSubscriber<Foo>() {
            override fun hookOnSubscribe(subscription: Subscription) {

                subscription.request(1)
            }

            override fun hookOnNext(value: Foo) {
                //process the foo
                fooProcessor.process(value)//sync call
                //ask for another foo
                request(1)
            }

            override fun hookOnError(throwable: Throwable) {
                logger.error("Something went wrong, restarting subscription", throwable)
                //restart the subscription. We'll recover if we're lucky
               subscription = subscribe()
            }
        }
    }




}

Upvotes: 0

Views: 677

Answers (1)

Phil Clay
Phil Clay

Reputation: 4554

  1. Instead of creating a Subscriber subclass that resubscribes on exception, chain one of the retry* operators on the Flux before subscribing. The retry operators will resubscribe to the upstream Flux if it completes with an exception. For example, fooFluxProvider.foos().retry() will retry indefinitely. There are other variations of retry* for more advanced behavior, including an extremely customizable retryWhen that can be used with the reactor.retry.Retry class from reactor-extra.
  2. Instead of passing a subscriber to subscribe(subscriber), call one of the subscribe methods that returns a Disposable. This gives you an object on which you can call dispose() later during shutdown to cancel the subscription.
  3. To implement SmartLifecycle:
    • In the constructor (or in start()), create the Flux (but do not subscribe to it in the constructor)
    • In start(), call flux.subscribe() and save the returned Disposable to a member field. The start() method is much better suited for starting background jobs than a constructor. Consider also chaining .subscribeOn(Scheduler) before .subscribe() if you want this to run in the background (by default, the subscription occurs on the thread on which subscribe was called).
    • In stop(), call disposable.dispose()

Perhaps something like this:

class SubscriptionManager(
        fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
        fooProcessor: FooProcessor
) : SmartLifecycle {
    private val logger = LoggerFactory.getLogger(javaClass)

    private val fooFlux = fooFluxProvider.foos()
            // Subscribe on a parallel scheduler to run in the background
            .subscribeOn(Schedulers.parallel())
            // Publish on a boundedElastic scheduler if fooProcessor.process blocks
            .publishOn(Schedulers.boundedElastic())
            // Use .doOnNext to send the foo to your processor
            // Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
            // Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
            .doOnNext(fooProcessor::process)
            // Log if an exception occurred
            .doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
            // Resubscribe if an exception occurred
            .retry()
            // Repeat if you want to resubscribe if the upstream flux ever completes successfully
            .repeat()

    private var disposable: Disposable? = null

    @Synchronized
    override fun start() {
        if (!isRunning) {
            disposable = fooFlux.subscribe()
        }
    }

    @Synchronized
    override fun stop() {
        disposable?.dispose()
        disposable = null
    }

    @Synchronized
    override fun isRunning(): Boolean {
        return disposable != null
    }

}

Upvotes: 0

Related Questions