johnny_crq
johnny_crq

Reputation: 4391

RxJava Return Subscribed value in a function

For some reason, i sometimes want to use RxOperators instead of the normal java-way of transforming data structures, because its cleaner and cleaner. For example:

Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()

Is there any way of waiting for the result of the observable and return in in a function: This would do (but it doesn't work):

private String getFilteredString(String string){
   Observable.from(listOfStrings)
       .filter(string -> string.getNumber() >5)
       .toList()
       .subscribe(finalStirng -> {
       return finalString;
       })
}

Upvotes: 8

Views: 6503

Answers (2)

Aaron
Aaron

Reputation: 1009

There is an instance method on rx.Observable called x() (link) that can be used to covert an observable to another value directly and fluently. The intent of this operator is to build up a repository of conversion functions that can switch an Observable into other kinds of concurrent/asynchronous data structures (such as other kinds of observables) or just simply unwrap the contents of the observable into the contained values. Write the conversion function once and store it away for repeated use.

Method signature:

public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion);

Usage for converting a potentially asynchronous observable to a concurrent list:

List<Integer> list = Observable.range(1, 10).x(new Func1<OnSubscribe<Integer>, List<Integer>>() {
    @Override
    public List<Integer> call(OnSubscribe<Integer> onSubscribe) {
        List<Integer> allElements = new ArrayList<Integer>();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
        onSubscribe.call(new Subscriber<Integer>(){
            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                failure.set(e);
            }

            @Override
            public void onNext(Integer t) {
                allElements.add(t);
            }});
        while (true) {
            try {
                latch.await();
                break;
            } catch (InterruptedException e1) {
                // continue waiting
            }
        }
        Throwable e = failure.get();
        if (e != null)
            throw new RuntimeException(e);
        return allElements;

    }});
System.out.println(list);

output

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Please note that this operator is currently @Experimental and is very likely to be renamed (to "extend"), however experimental functionality in RxJava can be changed in any release.

Upvotes: 3

Vladimir Mironov
Vladimir Mironov

Reputation: 30864

You can transform any Observable to a synchronous one using .toBlocking():

private List<String> getFilteredString(List<String> strings) {
  return Observable.from(strings)
      .filter(string -> string.length() > 5)
      .toList()
      .toBlocking()
      .single();
}

UPDATE: Although the code above is perfectly valid and will work as expected, this isn't how RxJava is supposed to be used. If your only goal is to transform a collection, there are better ways to do it:

Upvotes: 9

Related Questions