Renan Ferrari
Renan Ferrari

Reputation: 2519

Custom Observable with wrapped listener ignores SubscribeOn operator

I have the following setup:

A class responsible to update a value that also offers a way to listens for changes to this value, through a callback:

private static class ValueUpdater<T> {
    Listener<T> listener;

    void update(T value) {
      if (listener != null) this.listener.onValueUpdated(value);
    }

    void setListener(Listener<T> listener) {
      this.listener = listener;
    }

    interface Listener<T> {
      void onValueUpdated(T value);
    }
  }

A method that wraps the listener used by that class to an Observable:

private static <T> Observable<T> createValueObservable(ValueUpdater<T> valueUpdater) {
    return Observable.create(subscriber -> {
      valueUpdater.setListener(value -> {
        System.out.println("Listener: " + Thread.currentThread().getName());
        if (!subscriber.isUnsubscribed()) {
          try {
            subscriber.onNext(value);
          } catch (Exception e) {
            subscriber.onError(e);
          }
        }
      });

      subscriber.add(Subscriptions.create(() -> valueUpdater.setListener(null)));
    });
  }

Which is then called like this:

public static void main(String[] args) throws Exception {
    final ValueUpdater<Integer> valueUpdater = new ValueUpdater<>();

    createValueObservable(valueUpdater)
        .subscribeOn(Schedulers.io())
        .subscribe(i -> System.out.println("Next: " + Thread.currentThread().getName()));

    Thread.sleep(1000);

    valueUpdater.update(1);
    valueUpdater.update(2);
    valueUpdater.update(3);
  }

Which gives me the following output:

Listener: main
Next: main
Listener: main
Next: main
Listener: main
Next: main

I know that happens because ValueUpdater.update() is being called on the main thread and that causes the Observable to switch threads internally and ultimately ignore the SubscribeOn() operator, as explained in this blog post.

What are the best alternatives to this scenario, considering I can't modify the ValueUpdater class and I want to be able to apply a Scheduler to the upstream Observables?

Edit: Just to make it a bit more clear, the point of my question is not to understand why this happens. What I'm trying to understand is the best way to design the code around this behavior, preventing the listener callback to run on the main thread.

The output I'm trying to achieve is something like this:

Listener: RxIoScheduler-3
Next: RxIoScheduler-3
Listener: RxIoScheduler-3
Next: RxIoScheduler-3
Listener: RxIoScheduler-3
Next: RxIoScheduler-3

.observeOn() is not going to do that, because it only affects items going downstream. If I would apply it, the output would be something like the following:

Listener: main
Listener: main
Listener: main
Next: RxIoScheduler-3
Next: RxIoScheduler-3
Next: RxIoScheduler-3

I've tried to use Observable.fromAsync() like below but it did not work either:

private static <T> Observable<T> createValueObservable(ValueUpdater<T> valueUpdater) {
    return Observable.fromAsync(emitter -> {
      final CompositeSubscription cs = new CompositeSubscription();
      final Scheduler.Worker worker = Schedulers.io().createWorker();
      final ValueUpdater.Listener<T> listener = new ValueUpdater.Listener<T>() {
        @Override public void onValueUpdated(T value) {
          System.out.println("Listener: " + Thread.currentThread().getName());
          try {
            emitter.onNext(value);
          } catch (Exception e) {
            emitter.onError(e);
          }
        }
      };

      valueUpdater.setListener(listener);

      cs.add(worker);
      cs.add(Subscriptions.create(() -> valueUpdater.setListener(null)));

      emitter.setSubscription(cs);
    }, AsyncEmitter.BackpressureMode.BUFFER);
  }

The only possible solution I can think of right now, is to always call ValueUpdater.update() from a different thread, but I'm pretty sure that's not practical for a real use case:

public static void main(String[] args) throws Exception {
    final ValueUpdater<Integer> valueUpdater = new ValueUpdater<>();

    createValueObservable1(valueUpdater)
        .subscribeOn(Schedulers.io())
        .subscribe(i -> System.out.println("Next: " + Thread.currentThread().getName()));

    Completable.fromAction(() -> {
      valueUpdater.update(1);
      valueUpdater.update(2);
      valueUpdater.update(3);
    }).subscribeOn(Schedulers.io()).subscribe();

    Thread.sleep(1000);
  }

Upvotes: 1

Views: 201

Answers (1)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

.subscribeOn() only modifies where the subscription .subscribe(...) happens (I don't think that it's necessary in your case, as you don't have io/computation in your subscription).

You should also use .observeOn() to modify where processing of the items happen.

Edit: No matter how you dice it, the body of the valueUpdater.update() will run in the same thread as the caller. However, as you're immediately switching to Rx, this might be what you're looking for:

private static <T> Observable<T> createValueObservable(ValueUpdater<T> valueUpdater) {
  return Observable.create(subscriber -> {
    valueUpdater.setListener(value -> {
      if (!subscriber.isUnsubscribed()) {
        try {
          subscriber.onNext(value);
        } catch (Exception e) {
          subscriber.onError(e);
        }
      }
    });

    subscriber.add(Subscriptions.create(() -> valueUpdater.setListener(null)));
  })
  .observeOn(Schedulers.io())
  .doOnNext(any -> System.out.println("Listener: " + Thread.currentThread().getName()));
}

Upvotes: 1

Related Questions