Pablo C. García
Pablo C. García

Reputation: 22394

Replace callbacks with observables from RxJava

Im using listeners as callbacks to observe asynchronous operations with Android, but I think that could be great replacing this listeners with RxJava, Im new using this library but I really like it and Im always using it with Android projects.

Here is my code to refactor:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}

A simple callback:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}

And the "observer":

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});

Thanks!

Upvotes: 40

Views: 26983

Answers (4)

Amjed Baig
Amjed Baig

Reputation: 422

Maybe.<String>create(new MaybeOnSubscribe<String>() {
      @Override
      public void subscribe(MaybeEmitter<String> e) throws Exception {
        OnSuccessListener(uri->{
          e.onSuccess(uri));
        })
        .addOnFailureListener(throwable -> {
          e.onError(throwable);
        });
      }
    });

Upvotes: -3

Geoffrey Marizy
Geoffrey Marizy

Reputation: 5521

How I would refactor your code; alongside getData method, I would add the getData method wrapped as a Single:

public void getData( final OnResponseListener listener ){
    if(data!=null && !data.isEmpty()){
        listener.onSuccess();
    }
    else{
        listener.onError();
    }
}

public Single<Boolean> getDataSingle() {
    return Single.create(new SingleOnSubscribe<Boolean>() {
        @Override
        public void subscribe(SingleEmitter<Boolean> e) throws Exception {
            getData(new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            });
        }
    });
}

Or with Java 8 :

public Single<Boolean> getDataSingle() {
    return Single.create(e -> getData(
            new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            })
    );
}

Now you have exposed a Rx API alongside the callback's one. Supposing it's some kind of DataProvider of your own, you can now use it without dealing with callbacks, like this:

dataProvider.getDataSingle()
        .map(result -> result ? "User exist" : "User doesn't exist")
        .subscribe(message -> display(message));

I used Rx2 but with Rx1 the logic is the same.

I also used a Single instead of an Observable, since you await only one value. The interest is a more expressive contract for your function.

You can't emit value on behalf of an Observable, ie calling something like myObservable.send(value). The first solution is to use a Subject. Another solution (the one above) is to create the observable with Observable.create() (or Single.create()). You call the callback method and create the listener inside the method Observable.create(), because it's inside Observable.create() that you can call onSuccess() method, the method who told the Observable to pass down a value.

It's what I use to wrap callback into observable. A bit complicated at first, but easy to adapt.

I give you another example, as asked. Let's say you want to display the changes of an EditText as a Snackbar:

View rootView;
EditText editTextView;

//Wrap Android addTextChangedListener into an Observable
Observable<String> textObservable = Observable.create(consumer ->
        editTextView.addTextChangedListener(new TextWatcher() {
            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {

            }

            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {

            }

            @Override
            public void afterTextChanged(Editable s) {
                consumer.onNext(s.toString());
            }
        })
);

//Use it
textObservable.subscribe(text -> Snackbar.make(rootView, text, Snackbar.LENGTH_SHORT).show());

Upvotes: 6

YMY
YMY

Reputation: 698

For example you can use Observable.fromCallable to create observable with your data.

public Observable<Data> getData(){
    return Observable.fromCallable(() -> {
        Data result = null;
        //do something, get your Data object
        return result;
    });
}

then use your data

 getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

Used rxjava 1.x and lambda expressions.

edit:

if I understand you well, you wanted to replace that listener, not wrap it into observable. I added other example in reference to your comment. Oh.. also you should use Single if you are expecting only one item.

public Single<Data> getData() {
        return Single.create(singleSubscriber -> {
            Data result = object.getData();
            if(result == null){
                singleSubscriber.onError(new Exception("no data"));
            } else {
                singleSubscriber.onSuccess(result);
            }
        });
    }

getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

Upvotes: 39

Maksim Ostrovidov
Maksim Ostrovidov

Reputation: 11058

You are looking for Completable.create:

Completable: Represents a deferred computation without any value but only indication for completion or exception. The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?

Completable.create(subscriber -> {
    object.getData(new OnResponseListener() {
        @Override
        public void onSuccess() {
           subscriber.onCompleted();
        }

        @Override
        public void onError() {
           subscriber.onError(* put appropriate Throwable here *);
        }
    }
})
...//apply Schedulers
.subscribe((() -> *success*), (throwable -> *error*));

Upvotes: 15

Related Questions