Oliver Hausler
Oliver Hausler

Reputation: 4987

Why does this observable emit only one value

I have the following code based on an example provided by @a.bertucci here Emit objects for drawing in the UI in a regular interval using RxJava on Android, where I zip an Observable with a Timer. When I trigger the subscription by calling processDelayedItems(), the code [A] in the zipped Observable is executed exactly once and one item is emitted to [B]. I would have expected code [A] to run continuously once triggered and keep emitting items every 1500 msec, but it obviously only runs once here.

private static void processDelayedItems() {

    Observable.zip(
            Observable.create(new Observable.OnSubscribe<Object>() {

                @Override public void call(Subscriber<? super Object> subscriber) {
                    // [A] this code is only called once
                    subscriber.OnNext(o)
                }

            }),
            Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
                @Override public Object call(Object entity, Long aLong) {
                    return entity;
                }
            }
    )
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {

                @Override public void call(Object entity) {
                    // ... and accordingly one item is emitted [B]
                }

            }, new Action1<Throwable>() {

                @Override public void call(Throwable throwable) {
                    throwable.printStackTrace();
                }

            }, new Action0() {

                @Override public void call() {

                }

            });

}
  1. Can anybody see the problem which I have here? Is it that I need to reference the Observable from outside the function to keep it alive for more time? Is it collected by GC (Android)? Is it a problem that the function is static?

  2. What are the rules for Observables in terms of their livetime? Are there any best practices how longer-running Observables should be referenced and if they can be static at all? In my tests I noticed that it doesn't really matter, but maybe it does here, when a timer is involved.

--

Corrected code [not working yet]:

Upvotes: 1

Views: 1547

Answers (2)

Miguel
Miguel

Reputation: 20153

In regard to your comment, for simplicity you could do this,

Observable.timer(1500, 1500, TimeUnit.MILLISECONDS)
        .flatMap(new Func1<Long, Observable<Object>>() {
            @Override
            public Observable<Object> call(Long aLong) {
                String o = "0";
                return Observable.from(o);
            }
        })
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object aLong) {
                System.out.println(aLong);
            }
        });

Here you still get the benefits of the timer without the added zip / repeat on top. It's still a bit verbose but it's a bit simpler.

Upvotes: 1

zsxwing
zsxwing

Reputation: 20836

You need repeat to generate an infinite Observable. E.g.,

    Observable.create(new Observable.OnSubscribe<Object>() {

        @Override public void call(Subscriber<? super Object> subscriber) {
            // [A] this code is only called once
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(o);
            }
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }

    }).repeat(Schedulers.newThread());

Is it that I need to reference the Observable from outside the function to keep it alive for more time? Is it collected by GC (Android)? Is it a problem that the function is static?

Since you use Schedulers.newThread() and timer, there will be some other Threads which has a reference to your Observable. You don't need more work.

What are the rules for Observables in terms of their livetime? Are there any best practices how longer-running Observables should be referenced and if they can be static at all? In my tests I noticed that it doesn't really matter, but maybe it does here, when a timer is involved.

You're right. It doesn't matter.

Upvotes: 1

Related Questions