Reputation: 4987
I have the following code based on an example provided by @a.bertucci here Emit objects for drawing in the UI in a regular interval using RxJava on Android, where I zip an Observable with a Timer. When I trigger the subscription by calling processDelayedItems(), the code [A] in the zipped Observable is executed exactly once and one item is emitted to [B]. I would have expected code [A] to run continuously once triggered and keep emitting items every 1500 msec, but it obviously only runs once here.
private static void processDelayedItems() {
Observable.zip(
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
subscriber.OnNext(o)
}
}),
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
@Override public Object call(Object entity, Long aLong) {
return entity;
}
}
)
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override public void call(Object entity) {
// ... and accordingly one item is emitted [B]
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
throwable.printStackTrace();
}
}, new Action0() {
@Override public void call() {
}
});
}
Can anybody see the problem which I have here? Is it that I need to reference the Observable from outside the function to keep it alive for more time? Is it collected by GC (Android)? Is it a problem that the function is static?
What are the rules for Observables in terms of their livetime? Are there any best practices how longer-running Observables should be referenced and if they can be static at all? In my tests I noticed that it doesn't really matter, but maybe it does here, when a timer is involved.
--
Corrected code [not working yet]:
added repeat()
Observable.zip(
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
subscriber.OnNext(o);
subscriber.OnCompleted();
}
}).repeat(Schedulers.newThread()),
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
@Override public Object call(Object entity, Long aLong) {
return entity;
}
}
)
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override public void call(Object entity) {
// ... and accordingly one item is emitted [B]
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
throwable.printStackTrace();
}
}, new Action0() {
@Override public void call() {
}
});
Upvotes: 1
Views: 1547
Reputation: 20153
In regard to your comment, for simplicity you could do this,
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<Object>>() {
@Override
public Observable<Object> call(Long aLong) {
String o = "0";
return Observable.from(o);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object aLong) {
System.out.println(aLong);
}
});
Here you still get the benefits of the timer without the added zip / repeat on top. It's still a bit verbose but it's a bit simpler.
Upvotes: 1
Reputation: 20836
You need repeat
to generate an infinite Observable. E.g.,
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(o);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
}).repeat(Schedulers.newThread());
Is it that I need to reference the Observable from outside the function to keep it alive for more time? Is it collected by GC (Android)? Is it a problem that the function is static?
Since you use Schedulers.newThread()
and timer
, there will be some other Threads which has a reference to your Observable. You don't need more work.
What are the rules for Observables in terms of their livetime? Are there any best practices how longer-running Observables should be referenced and if they can be static at all? In my tests I noticed that it doesn't really matter, but maybe it does here, when a timer is involved.
You're right. It doesn't matter.
Upvotes: 1