user3762200
user3762200

Reputation: 447

How to use RxJava combineLatest operator with more than 9 observables

I'm using RxJava and I want to combine 12 different observables using the operator combineLatest.

I saw a function prototype that takes a list of observables and an implementation of FuncN but I'm not sure how to do this, I'm having trouble implementing the call method.

Can someone show me an example?

Upvotes: 16

Views: 21455

Answers (5)

ubuntudroid
ubuntudroid

Reputation: 3989

Here is a simple extension function for RxKotlin if you have 10 sources for combineLatest. You can easily create similar functions for more sources or adapt this to work with plain RxJava.


import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables

@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
    source1: Observable<T1>, source2: Observable<T2>,
    source3: Observable<T3>, source4: Observable<T4>,
    source5: Observable<T5>, source6: Observable<T6>,
    source7: Observable<T7>, source8: Observable<T8>,
    source9: Observable<T9>, source10: Observable<T10>,
    crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
    Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
        combineFunction(
            it[0] as T1,
            it[1] as T2,
            it[2] as T3,
            it[3] as T4,
            it[4] as T5,
            it[5] as T6,
            it[6] as T7,
            it[7] as T8,
            it[8] as T9,
            it[9] as T10
        )
    }

Note: I've created this as an extension function to stay consistent with how combineLatest function calls look like for less than 10 sources (Observables.combineLatest(...)). That way I don't have to think about which combineLatest version I need for what number of parameters. Technically there is no need to make it an extension function.

Upvotes: 1

peresisUser
peresisUser

Reputation: 1686

To expand on Egor Neliuba's answer, you can aggregate all the results inside a container object, and then use it as you will inside the subscribe clause:

 List<Observable<?>> list = new ArrayList<>();
    list.add(mCreateMarkupFlowManager.getFlowState());
    list.add(mCreateIssueFlowStateManager.getIssueFlowState());
    list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
    list.add(mViewerStateManager.getIssueLoadingProgressChanges());
    list.add(mMeasurementFlowStateManager.getFlowState());
    list.add(mViewerStateManager.isSheetLoaded());
    list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
    list.add(mViewerStateManager.getMarkupViewMode());
    list.add(mViewerStateManager.isFirstPerson());
    list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
    list.add(mCreateRfiFlowStateManager.getRfiFlowState());

    attachSubscription(Observable.combineLatest(list, args -> {
                Holder holder = new Holder();
                holder.setFirst((String) args[0]);
                holder.setSecond((Integer) args[1]);
                holder.setThird((Boolean) args[2]);
                holder.setFourth((Boolean) args[3]);
                holder.setFifth((String) args[4]);
                holder.setSixth((Boolean) args[5]);
                holder.setSeventh((Boolean) args[6]);
                holder.setEighth((Boolean) args[7]);
                holder.setNinth((Boolean) args[8]);
                holder.setTenth((Boolean) args[9]);
                holder.setEleventh((String) args[10]);
                return holder;
            })
                    .filter(holder -> Util.isTrue(holder.sixth))
                    .compose(Util.applySchedulers())
                    .subscribe(holder -> {
                        if (isViewAttached()) {
                            String createMarkupState = holder.first;
                            Integer createIssueState = holder.second;
                            boolean markupsLoadingFinished = holder.third;
                            boolean issuesLoadingFinished = holder.fourth;
                            boolean loadingFinished = markupsLoadingFinished && issuesLoadingFinished;
                            String measurementState = holder.fifth;
                            boolean isMarkupLockMode = holder.eighth;

                            boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
                            boolean showCreateMeasureButton = shouldShowMeasureButton();
                            boolean showCreateFieldIssueButton = holder.seventh;
                            boolean isFirstPersonEnabled = holder.ninth;
                            Boolean showCreateRfiButton = holder.tenth;
                            String rfiFlowState = holder.eleventh;


                        }
                    })
    );


public class Holder {


public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;

public void setEleventh(String eleventh) {
    this.eleventh = eleventh;
}


public void setFirst(String first) {
    this.first = first;
}


public void setSecond(Integer second) {
    this.second = second;
}


public void setThird(Boolean third) {
    this.third = third;
}


public void setFourth(Boolean fourth) {
    this.fourth = fourth;
}


public void setFifth(String fifth) {
    this.fifth = fifth;
}


public void setSixth(Boolean sixth) {
    this.sixth = sixth;
}


public void setSeventh(Boolean seventh) {
    this.seventh = seventh;
}


public void setEighth(Boolean eighth) {
    this.eighth = eighth;
}


public void setNinth(Boolean ninth) {
    this.ninth = ninth;
}


public void setTenth(Boolean tenth) {
    this.tenth = tenth;
}


public Holder() {}

}

Upvotes: 0

android
android

Reputation: 3090

RxKotlin supports upto 9 opertators in parameters in combineLatest() method but to use more than 9 parameters means to pass unlimited dynamic custom object arraylist you can use it as below:

First Let me give you simple example with only two parameters with custom data types

val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
                .subscribe({
                    Log.d("combineLatest", "onNext - ${it}")
                })

Now what if i want to pass multiple parameters in combineLatest? Then your answer is below: (i have used custom data types, so someone's custom problem can also be solved here)

val myList = arrayOf(Observable.just("MyName"),
                Observable.just(2),
                Observable.just(3.55),
                Observable.just("My Another String"),
                Observable.just(5),
                Observable.just(6),
                Observable.just(7),
                Observable.just(8),
                Observable.just(9),
                Observable.just(10),
                Observable.just(11),
                Observable.just(12),
                Observable.just(13),
                Observable.just(14),
                Observable.just(15))

Observable.combineLatest(myList, {
    val a = it[0] as String
    val b = it[1] as Int
    val c = it[2] as Float
    val d = it[3] as String
    val e = it[4] as Int
    val f = it[5] as Int
    val g = it[6] as Int
    val h = it[7] as Int
    val i = it[8] as Int
    val j = it[9] as Int
    val k = it[10] as Int
    val l = it[11] as Int
    val m = it[12] as Int
    "$a - age:${b}" })
        .subscribe({
            Log.d("combineLatest", "onNext - ${it}")
        })

Upvotes: 2

TacoEater
TacoEater

Reputation: 2268

Yo expand on that answer, I am using it to read multiple characteristics at once, it can be done like so:

connectionObservable
                .flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
                    List<Observable<?>> list1 = Arrays.asList(
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...));
                    return Observable.combineLatest(list1, args -> {
                       Object o =  doSomethingWithResults(args);
                        return o;
                    });
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnUnsubscribe(this::clearConnectionSubscription)
                .subscribe(retVal -> {
                    Log.d(TAG, "result:" + retVal.toString());
                    Log.w(TAG, "SUCCESS");
                    triggerDisconnect();

                }, MyActivity.this::onReadFailure);
    }

Comments if you have suggestions on how to improve this process.

Upvotes: 6

Egor Neliuba
Egor Neliuba

Reputation: 15054

There is a combineLatest that takes a List of observables. Here's an example on how to use it:

List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
    @Override
    public String call(Object... args) {
        String concat = "";
        for (Object value : args) {
            if (value instanceof Integer) {
                concat += (Integer) value;
            } else if (value instanceof String) {
                concat += (String) value;
            }
        }
        return concat;
    }
});

Upvotes: 29

Related Questions