Reputation: 447
I'm using RxJava and I want to combine 12 different observables using the operator combineLatest
.
I saw a function prototype that takes a list of observables and an implementation of FuncN
but I'm not sure how to do this, I'm having trouble implementing the call
method.
Can someone show me an example?
Upvotes: 16
Views: 21455
Reputation: 3989
Here is a simple extension function for RxKotlin if you have 10 sources for combineLatest
. You can easily create similar functions for more sources or adapt this to work with plain RxJava.
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
source1: Observable<T1>, source2: Observable<T2>,
source3: Observable<T3>, source4: Observable<T4>,
source5: Observable<T5>, source6: Observable<T6>,
source7: Observable<T7>, source8: Observable<T8>,
source9: Observable<T9>, source10: Observable<T10>,
crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
combineFunction(
it[0] as T1,
it[1] as T2,
it[2] as T3,
it[3] as T4,
it[4] as T5,
it[5] as T6,
it[6] as T7,
it[7] as T8,
it[8] as T9,
it[9] as T10
)
}
Note: I've created this as an extension function to stay consistent with how combineLatest
function calls look like for less than 10 sources (Observables.combineLatest(...)
). That way I don't have to think about which combineLatest
version I need for what number of parameters. Technically there is no need to make it an extension function.
Upvotes: 1
Reputation: 1686
To expand on Egor Neliuba's answer, you can aggregate all the results inside a container object, and then use it as you will inside the subscribe clause:
List<Observable<?>> list = new ArrayList<>();
list.add(mCreateMarkupFlowManager.getFlowState());
list.add(mCreateIssueFlowStateManager.getIssueFlowState());
list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
list.add(mViewerStateManager.getIssueLoadingProgressChanges());
list.add(mMeasurementFlowStateManager.getFlowState());
list.add(mViewerStateManager.isSheetLoaded());
list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
list.add(mViewerStateManager.getMarkupViewMode());
list.add(mViewerStateManager.isFirstPerson());
list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
list.add(mCreateRfiFlowStateManager.getRfiFlowState());
attachSubscription(Observable.combineLatest(list, args -> {
Holder holder = new Holder();
holder.setFirst((String) args[0]);
holder.setSecond((Integer) args[1]);
holder.setThird((Boolean) args[2]);
holder.setFourth((Boolean) args[3]);
holder.setFifth((String) args[4]);
holder.setSixth((Boolean) args[5]);
holder.setSeventh((Boolean) args[6]);
holder.setEighth((Boolean) args[7]);
holder.setNinth((Boolean) args[8]);
holder.setTenth((Boolean) args[9]);
holder.setEleventh((String) args[10]);
return holder;
})
.filter(holder -> Util.isTrue(holder.sixth))
.compose(Util.applySchedulers())
.subscribe(holder -> {
if (isViewAttached()) {
String createMarkupState = holder.first;
Integer createIssueState = holder.second;
boolean markupsLoadingFinished = holder.third;
boolean issuesLoadingFinished = holder.fourth;
boolean loadingFinished = markupsLoadingFinished && issuesLoadingFinished;
String measurementState = holder.fifth;
boolean isMarkupLockMode = holder.eighth;
boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
boolean showCreateMeasureButton = shouldShowMeasureButton();
boolean showCreateFieldIssueButton = holder.seventh;
boolean isFirstPersonEnabled = holder.ninth;
Boolean showCreateRfiButton = holder.tenth;
String rfiFlowState = holder.eleventh;
}
})
);
public class Holder {
public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;
public void setEleventh(String eleventh) {
this.eleventh = eleventh;
}
public void setFirst(String first) {
this.first = first;
}
public void setSecond(Integer second) {
this.second = second;
}
public void setThird(Boolean third) {
this.third = third;
}
public void setFourth(Boolean fourth) {
this.fourth = fourth;
}
public void setFifth(String fifth) {
this.fifth = fifth;
}
public void setSixth(Boolean sixth) {
this.sixth = sixth;
}
public void setSeventh(Boolean seventh) {
this.seventh = seventh;
}
public void setEighth(Boolean eighth) {
this.eighth = eighth;
}
public void setNinth(Boolean ninth) {
this.ninth = ninth;
}
public void setTenth(Boolean tenth) {
this.tenth = tenth;
}
public Holder() {}
}
Upvotes: 0
Reputation: 3090
RxKotlin supports upto 9 opertators in parameters in combineLatest() method but to use more than 9 parameters means to pass unlimited dynamic custom object arraylist you can use it as below:
First Let me give you simple example with only two parameters with custom data types
val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Now what if i want to pass multiple parameters in combineLatest? Then your answer is below: (i have used custom data types, so someone's custom problem can also be solved here)
val myList = arrayOf(Observable.just("MyName"),
Observable.just(2),
Observable.just(3.55),
Observable.just("My Another String"),
Observable.just(5),
Observable.just(6),
Observable.just(7),
Observable.just(8),
Observable.just(9),
Observable.just(10),
Observable.just(11),
Observable.just(12),
Observable.just(13),
Observable.just(14),
Observable.just(15))
Observable.combineLatest(myList, {
val a = it[0] as String
val b = it[1] as Int
val c = it[2] as Float
val d = it[3] as String
val e = it[4] as Int
val f = it[5] as Int
val g = it[6] as Int
val h = it[7] as Int
val i = it[8] as Int
val j = it[9] as Int
val k = it[10] as Int
val l = it[11] as Int
val m = it[12] as Int
"$a - age:${b}" })
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Upvotes: 2
Reputation: 2268
Yo expand on that answer, I am using it to read multiple characteristics at once, it can be done like so:
connectionObservable
.flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
List<Observable<?>> list1 = Arrays.asList(
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...));
return Observable.combineLatest(list1, args -> {
Object o = doSomethingWithResults(args);
return o;
});
})
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearConnectionSubscription)
.subscribe(retVal -> {
Log.d(TAG, "result:" + retVal.toString());
Log.w(TAG, "SUCCESS");
triggerDisconnect();
}, MyActivity.this::onReadFailure);
}
Comments if you have suggestions on how to improve this process.
Upvotes: 6
Reputation: 15054
There is a combineLatest
that takes a List
of observables. Here's an example on how to use it:
List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
@Override
public String call(Object... args) {
String concat = "";
for (Object value : args) {
if (value instanceof Integer) {
concat += (Integer) value;
} else if (value instanceof String) {
concat += (String) value;
}
}
return concat;
}
});
Upvotes: 29