Reputation: 21
This is using RxJava version 0.19.6.
Outside of a groupBy operation, one can create a pipeline described by the following code to, for instance, select a record from an Observable based on some criteria or select the first record that meets some alternate criteria:
Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
Observable<Long> filter1 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = observable.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
BlockingObservable.from(Observable.concat(filter1, filter2).first()).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
...unfortunately, due to restrictions on the GroupedObservable, it appears that the same kind of procedure does not work will operating inside of a grouped context:
BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong % 5;
}
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
@Override
public Observable<Long> call(GroupedObservable<Long, Long> in) {
Observable<Long> filter1 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
Observable<Long> filter2 = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5;
}
});
return Observable.concat(filter1, filter2).first();
}
})).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
...results in a multiple subscriber exception (Exception in thread "main" java.lang.IllegalStateException: Only one subscriber allowed!).
Am I missing some obvious fix to this problem? I have tried playing around with ConnectableObservables in this case to give the appearance of a single Subscriber, but those attempts have been failures as well (surely due to ignorance on my part).
On a related note, the groupByUntil seems to give you a reference to the GroupedObservable as well which was giving me a similar headache of complaining about multiple subscribers if I actually tried to use it to determine when to close the window. Here again I'm sure that I am overlooking something obvious since the API clearly expects one to use the GroupedObservable!
Upvotes: 2
Views: 3116
Reputation: 359
You can combine your filters into single filter like that:
Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong % 5;
}
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
@Override
public Observable<Long> call(GroupedObservable<Long, Long> in) {
Observable<Long> filter = in.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 2 == aLong % 5 || 4 == aLong % 5;
}
});
return filter;
}
})).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
Upvotes: 0
Reputation: 5823
You may be able to use .cache() on the GroupedObservable.
final Observable<Long> inCached = in.cache();
Then use the resulting Observable in your filters.
Observable<Long> filter1 = inCached.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return 5 == aLong % 5;
}
});
That way each subscriber will see the same items but there will only be one subscriber to the GroupedObservable.
Upvotes: 0
Reputation: 39192
Have you tried upgrading your rxJava version?
https://github.com/ReactiveX/RxJava/issues/957
Upvotes: -1