Daniele Vitali
Daniele Vitali

Reputation: 3858

Combining Firebase realtime data listener with RxJava

I am using Firebase in my app, along with RxJava. Firebase is capable of notify your app whenever something changed in the backend data (addition, removals, changes, ...). I am trying to combine the feature of Firebase with RxJava.

The data I am listening for is called Leisure, and the Observable emits LeisureUpdate which contains a Leisure and the type of update (add, remove, moved, changed).

Here is my method which allows to subscribe to this events.

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
                    }

                    @Override
                    public void onCancelled(FirebaseError firebaseError) {
                        subscriber.onError(new Error(firebaseError.getMessage()));
                    }
                });
            }
        });
    }
    leisureUpdatesSubscriptionsCount++;
    return leisureUpdatesObservable;
}

First off, I would like to use Observable.fromCallable() method in order to create the Observable, but I guess it is impossible, since Firebase uses callbacks, right?

I keep a single instance of the Observable in order to always have one Observable where multiple Subscriber can subscribe.

The problem comes when everyone unsubscribe and I need to stop listening for the events in Firebase. I didn't find anyway to make the Observable understand if there is any subscription still. So I keep counting how many calls I got to subscribeToLeisuresUpdates(), with leisureUpdatesSubscriptionsCount.

Then every time someone wants to unsubscribe it has to call

@Override
public void unsubscribeFromLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        return;
    }
    leisureUpdatesSubscriptionsCount--;
    if (leisureUpdatesSubscriptionsCount == 0) {
        firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
        leisureUpdatesObservable = null;
    }
}

This is the only way I found to make the Observable emits items when there is a subscriber, but I feel like there must be an easier way, specially understanding when there is no more subscribers listening to the observable.

Anyone who encountered a similar problem or have a different approach?

Upvotes: 15

Views: 13683

Answers (5)

Link182
Link182

Reputation: 839

Another amazing library which will help you to wrap all firebase realtime database logic under rx patterns.

https://github.com/Link184/Respiration

Here you can create your firebase repository and extend it from GeneralRepository for example:

@RespirationRepository(dataSnapshotType = Leisure.class)
public class LeisureRepository extends GeneralRepository<Leisure>{
    protected LeisureRepository(Configuration<Leisure> repositoryConfig) {
        super(repositoryConfig);
    }

    // observable will never emit any events if there are no more subscribers
    public void killRepository() {
        if (!behaviorSubject.hasObservers()) {
            //protected fields
            behaviorSubject.onComplete();
            databaseReference.removeEventListener(valueListener);
        }
    }
}

You can "kill" your repository in that way:

// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build
LeisureRepositoryBuilder.getInstance().killRepository();

But I think for your situation will be better to extend com.link184.respiration.repository.ListRepository to avoid data mapping from java.util.Map to Leisure model through LeisureUpdate

Upvotes: 0

granko87
granko87

Reputation: 211

Here is a sample code for using RxJava2 with Firebase's CompletionListener:

Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey();
            FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order,
                    new DatabaseReference.CompletionListener() {
                        @Override
                        public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) {
                            if (e.isDisposed()) {
                                return;
                            }
                            if (databaseError == null) {
                                e.onComplete();
                            } else {
                                e.onError(new Throwable(databaseError.getMessage()));
                            }
                        }
                    });

        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Upvotes: 0

Francisco Durdin Garcia
Francisco Durdin Garcia

Reputation: 13327

I suggest you to check as reference(or just use it) one of the next libraries:

RxJava : https://github.com/nmoskalenko/RxFirebase

RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase

One of them works with RxJava and the other one with the new RC of RxJava 2.0. If you are interested of it, you can see the differences between both here.

Upvotes: 3

Francesc
Francesc

Reputation: 29280

You can use Observable.fromEmitter, something along these lines

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
        @Override
        public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    // process update
                    LeisureUpdate leisureUpdate = ...
                    leisureUpdateEmitter.onNext(leisureUpdate);
                }

                @Override
                public void onCancelled(DatabaseError databaseError) {
                    leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
                    mDatabaseReference.removeEventListener(this);
                }
            };
            mDatabaseReference.addValueEventListener(listener);
            leisureUpdateEmitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    mDatabaseReference.removeEventListener(listener);
                }
            });
        }
    }, Emitter.BackpressureMode.BUFFER);

Upvotes: 6

Vinay Nagaraj
Vinay Nagaraj

Reputation: 1192

Put this in your Observable.create() at the end.

subscriber.add(Subscriptions.create(new Action0() {
                    @Override public void call() {
                        ref.removeEventListener(leisureUpdatesListener);
                    }
                }));

Upvotes: 3

Related Questions