mutobj
mutobj

Reputation: 27

Create Flowable from while loop

I am pretty new to RxJava and I need to create repository with several datasources. It is complex to me because there are several smaller subtasks which I don't know how to implement with RxJava.

First I have self written dao, which process InputStream, and provides Items in the specified range. Currently it simply collects data in a list, but I want to provide Items one by one using flowable; Currently it privides Maybe<List<Item>>. Also there several errors need to be transmitted to higher level (datasource). Such as EndOfFile, to notify DataSource that data fully cached;

Dao.class:

List<Item> loadRange(int start, int number) throws ... {
    ...
    while(...) {
        ...
        //TODO contribute item to flowable
        resultList.add(new Item(...)) 

    }
    return resultList;
}

Maybe<List<Item>> method just created Maybe.fromCallable();

Please help me!

Upvotes: 0

Views: 785

Answers (1)

Mark
Mark

Reputation: 9919

Something like this should work for this :

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(emitter -> {
            try {
                while (...){
                    emitter.onNext(new Item());
                }
                emitter.onComplete();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, BackpressureStrategy.BUFFER);
    }

I assume once the loop is done you want to complete, also send errors downstream, rather than handle on the method signature. Also you can change the BackPressureStrategy to suit your usecase i.e DROP, LATEST etc..

As you're new to RxJava, the anonymous class would be :

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(new FlowableOnSubscribe<Item>() {
            @Override public void subscribe(FlowableEmitter<Item> emitter) {
                try {
                    while (...){
                        emitter.onNext(new Item());
                    }
                    emitter.onComplete();
                } catch (IOException e) {
                    emitter.onError(e);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }

Upvotes: 1

Related Questions