Reputation: 320
I'm stuck in a quite weird problem and ask for your help.
The overall setting is the following: I have a bunch of data providers that can occur or disappear at runtime. These providers provide - big surprise - data, modeled as io.reactivex.Observable
s. (To be precise: as BehaviorSubject
s, remembering the latest data for new subscribers.)
Now, I need to combine the data of all current data providers, so that I get a new "main" Observable which gets updated whenever any data provider's observable changes or when new providers appear (or old ones disappear).
So far, that sounds like merging, but for the main Observable I need all provider's data combined, on any change, each provider's respective last state. This combining works fine for non-dynamic providers, which are known in advance, using Observable.combineLatest
.
But the problems arise, when I embed that method into a dynamic context, listending for added or removed providers. Then an update of one of the provider's Observable triggers not only one update as expected, but several updates, some of them only containing partial data.
Have a look at the following (self-contained, using RxJava 2.1.9) example, which should clarify on my problem. Commenting is done as println(), so that the produced output is readable, too. The first part is static, without adding or removing providers, and works as expected. The second part is the weird thing...
I'd appreciate any further ideas or assistance to solve this issue - thanks!
import io.reactivex.Observable;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class CombineLatestWithDynamicSources {
static final Function<Object[], List<String>> STRING_COMBINER = objects -> Arrays.stream(objects)
.map(Object::toString)
.collect(Collectors.toList());
public static void main(String[] args) {
System.out.println("*** STATIC ***");
staticCombineWorksAsExpected();
System.out.println("\n*** DYNAMIC ***");
dynamicCombineBehavesWeird();
}
static void staticCombineWorksAsExpected() {
Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
Subject<String> subjectB = BehaviorSubject.createDefault("B.1");
List<Subject<String>> subjects = Arrays.asList(subjectA, subjectB); // potentially more...
Observable<List<String>> combined = Observable.combineLatest(subjects, STRING_COMBINER);
System.out.println("initial values:");
combined.subscribe(strings -> System.out.println(">> Combined: " + strings));
System.out.println("updating A:");
subjectA.onNext("A.2");
System.out.println("updating B:");
subjectB.onNext("B.2");
System.out.println("\n... works as expected, but adding subjects isn't possible in this setting.");
}
static void dynamicCombineBehavesWeird() {
List<Subject<String>> subjectsList = new ArrayList<>();
Subject<List<Subject<String>>> subjectsObservable = BehaviorSubject.createDefault(subjectsList);
System.out.println("subjects are initially empty:");
subjectsObservable.subscribe(subjects -> System.out.println(">> Subjects: " + subjects));
Observable<List<String>> combined = subjectsObservable.flatMap(
subjects -> Observable.combineLatest(subjects, STRING_COMBINER));
combined.subscribe(strings -> System.out.println(">> Combined: " + strings));
System.out.println("add Subject A, providing default value 'A.1' - as expected:");
Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
subjectsList.add(subjectA);
subjectsObservable.onNext(subjectsList);
System.out.println("updating A - also as expected:");
subjectA.onNext("A.2");
System.out.println("add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:");
Subject<String> subjectB = BehaviorSubject.createDefault("B.1");
subjectsList.add(subjectB);
subjectsObservable.onNext(subjectsList);
System.out.println("updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!");
subjectA.onNext("A.3");
System.out.println("This doesn't happen on updating B...:");
subjectB.onNext("B.2");
System.out.println("digging deeper, add Subject C, providing default value 'C.1' - as expected again:");
Subject<String> subjectC = BehaviorSubject.createDefault("C.1");
subjectsList.add(subjectC);
subjectsObservable.onNext(subjectsList);
System.out.println("Now update A - three results pop up, only the last is expected!");
subjectA.onNext("A.4");
System.out.println("update B, which now emits also two results - last expected only:");
subjectB.onNext("B.3");
System.out.println("update C works as expected:");
subjectC.onNext("C.2");
System.out.println("\n... huh? Seems on updating the first source, the combined results gets computed for the first, " + "then for the first and second, then for first, second and third (and so on?) source...");
}
}
which produces the following output:
*** STATIC ***
initial values:
>> Combined: [A.1, B.1]
updating A:
>> Combined: [A.2, B.1]
updating B:
>> Combined: [A.2, B.2]
works as expected, but adding subjects isn't possible in this setting.
*** DYNAMIC ***
subjects are initially empty:
>> Subjects: []
add Subject A, providing default value 'A.1' - as expected:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e]
>> Combined: [A.1]
updating A - also as expected:
>> Combined: [A.2]
add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd]
>> Combined: [A.2, B.1]
updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!
>> Combined: [A.3]
>> Combined: [A.3, B.1]
This doesn't happen on updating B...:
>> Combined: [A.3, B.2]
digging deeper, add Subject C, providing default value 'C.1' - as expected again:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd, io.reactivex.subjects.BehaviorSubject@47f6473]
>> Combined: [A.3, B.2, C.1]
Now update A - three results pop up, only the last is expected!
>> Combined: [A.4]
>> Combined: [A.4, B.2]
>> Combined: [A.4, B.2, C.1]
update B, which now emits also two results - last expected only:
>> Combined: [A.4, B.3]
>> Combined: [A.4, B.3, C.1]
update C works as expected:
>> Combined: [A.4, B.3, C.2]
huh? Seems on updating the first source, the combined results gets computed for the first, then for the first and second, then for first, second and third (and so on?) source...
Upvotes: 3
Views: 1952
Reputation: 320
Some time since - now I know better... ;-)
Simple Solution - so simple... :m
subjectsObservable.switchMap(subjects -> Observable.combineLatest(subjects, STRING_COMBINER))
.subscribe(strings -> System.out.println(">> Combined: " + strings));
Upvotes: 4
Reputation: 31
You can use .concatMap oprator
val o1 = Observable.just(0,1,2)
val o2 = Observable.just(3,4,5)
val subject = PublishSubject.create<Observable<Int>>()
subject
.concatMap { it }
.subscribe { it: Int? ->
System.out.print("$it,")
}
subject.onNext(o1)
subject.onNext(o2)
out is
0,1,2,3,4,5,
Upvotes: 0
Reputation: 320
My combining-problem kept me going for several days before asking here - but even after my question, it didn't let me off... now finally, I could dig on it and I have a solution. The problem was the not-disposing of the previous subscriptions inside the flatmap. gosh!
First, I pulled the chaining apart to get insight to the moving parts:
List<Subject<String>> subjectsList = new ArrayList<>();
Subject<List<Subject<String>>> subjectsObservable = BehaviorSubject.createDefault(subjectsList);
final Disposable[] disposable = new Disposable[]{Disposables.empty()};
// combined now is a subject to push updates into
Subject<List<String>> combined = BehaviorSubject.createDefault(Collections.emptyList());
subjectsObservable.subscribe(subjects -> {
Observable<List<String>> combSubj = Observable.combineLatest(subjects, STRING_COMBINER);
// here's the key: I remember the previous subscription and dispose it on update.
disposable[0].dispose();
disposable[0] = combSubj.subscribe(strings -> combined.onNext(strings));
});
combined.subscribe(strings -> System.out.println(">> Combined: " + strings));
And finally, a little bit neater and closer to my original (not working) solution:
// remember the disposables, start with an empty to avoid NullPointerExceptions
Disposable[] prevDisposable = new Disposable[]{Disposables.empty()};
Observable<List<String>> combined = subjectsObservable.flatMap(
subjects -> Observable.combineLatest(subjects, STRING_COMBINER)
// dispose the previous subscription to combineLatest's Observable and remember the new one
.doOnSubscribe(disposable -> {
prevDisposable[0].dispose();
prevDisposable[0] = disposable;
})
);
combined.subscribe(strings -> System.out.println(">> Combined: " + strings));
Upvotes: 0