LightingX
LightingX

Reputation: 31

How to wait for flowable onComplete?

In RxJava2, here is my code:

public void myMethod() {
Flowable.create(e->{
  // do sth.
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.subscribe(new Subscriber<ContentsWithChannelVO>() {
  @Override
  public void onSubscribe(Subscription subscription) { // do sth. }

  @Override
  public void onNext(ContentsWithChannelVO contentAndChannel) { // do sth. }

  @Override
  public void onError(Throwable throwable) { // do sth. }

  @Override
  public void onComplete() { // do sth. }
});
doSomething();
}

Here is a synchronized problem.

I want to know how to let my method wait for the flowable onComplete, that is to say doSomething() will be called after the flowable onCompleted.

I have searched for it but it doesn't help.

Upvotes: 2

Views: 1118

Answers (2)

akarnokd
akarnokd

Reputation: 69997

You'd have to block the consumption of the flow for that (which is generally not recommended):

public void myMethod() {
    Flowable.create(e -> {
       // do sth.
    }, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation(), false)
    .blockingSubscribe(new Subscriber<ContentsWithChannelVO>() {
        @Override
        public void onSubscribe(Subscription subscription) { // do sth. }

        @Override
        public void onNext(ContentsWithChannelVO contentAndChannel) { // do sth. }

        @Override
        public void onError(Throwable throwable) { // do sth. }

        @Override
        public void onComplete() { // do sth. }
    });

    doSomething();
}

In this case, having observeOn is pointless. Note also you have to use subscribeOn() with false otherwise the flow livelocks.

If you don't want to execute doSomething if the source fails, use blockingForEach:

public void myMethod() {
    Flowable.create(e -> {
       // do sth.
    }, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation(), false)
    .blockingForEach(contentAndChannel -> { /* do sth. */ });

    doSomething();
}

Upvotes: 2

Bracadabra
Bracadabra

Reputation: 3659

You can use concat operator, but you need to wrap your method in another observable.

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

Upvotes: 2

Related Questions