Chiradeep
Chiradeep

Reputation: 991

Create synchronization interval in RxJava

I need to run 2 JOB at a specific interval of 4,8,12,16... second and another one is 5,9,13,17...second.

I have used Interval operator in RxJava. Job B needs to run after Job A. Job B should sleep when Job A is running and vice versa. Till now the code looks below

var compositeDisposable = CompositeDisposable()
compositeDisposable.add(Observable.interval(0, recordIntervalPeriod, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobA()
                        })
compositeDisposable.add(Observable.interval(0, recorderStopIntervalStartTime, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobB()
                        })

Need help in following

1. Best way to achieve the above using RxJava

2. Run JobA for 4 second then run JobB for 4 second and repeat the process again.

Upvotes: 0

Views: 264

Answers (2)

GSala
GSala

Reputation: 976

What about

Observable.interval(4,TimeUnit.SECONDS)
    .flatMap({
        jobA().zipWith(Observable.timer(1, TimeUnit.SECONDS) }
            .flatMap { jobB() }
    }, maxConcurrent = 1).subscribe()

I'm assuming jobA() and jobB() are observables of some sort.

Job A should wait on Job B being done, because of the max concurrency set to 1.

Job B should wait on Job A or 1 second from the start of Job A, whichever happens latest.

Upvotes: 0

Nicolas
Nicolas

Reputation: 7081

I would suggest you use a single job that runs every second, and decide each time which job to call based on the counter value:

val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .serialize()
        .subscribe { counter ->
            if (counter % 4 == 0L) {
                jobA()
            } else if ((counter - 1) % 4 == 0L) {
                jobB()
            }
        }

If you still want to use two observables, I think this will work too:

val disposable = CompositeDisposable()
disposable.addAll(
        Observable.interval(4, TimeUnit.SECONDS)
                .subscribe {
                    jobA()
                },
        Observable.interval(4, TimeUnit.SECONDS)
                .delay(1, TimeUnit.SECONDS)
                .subscribe {
                    jobB()
                })

Disclaimer: I haven't used RxJava a lot.

Upvotes: 1

Related Questions