Oliver Hausler
Oliver Hausler

Reputation: 4977

How to emit results using a Timer Observable with flatMap

I am using rxjava for maintenance tasks in the following way:

In a class where I need regular maintenance, I use the following static subscription, which results in the Observable being launched for the first time when the class is loaded into memory, and then in regular intervals as specified.

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code

                return Observable.just(null);
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe();

Now I have a situation, where I want to report the result of my maintenance to the UI.

Normally, I would use the following schema

    Observable.create(new Observable.OnSubscribe<String>() {

        @Override public void call(Subscriber<? super String> subscriber) {

            // some code

            subscriber.onNext(result);
            subscriber.onCompleted();            }

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {

                @Override public void call(String result) {

                    // write to the UI

                }

            });

but here we have an Observable which is executed just once.

For an Observable which is executed in regular intervals, I couldn't find a way to call Action in Subscriber, so that I can pass the result using subscriber.onNext(). It looks like there is no suitable signature for the Observable, which can take the long from timer() and at the same time allows subscribe with action. But knowing rxjava I am sure there is a trick ;-)

I could make this work using zip to zip a Timer Observable and a one-time Observable (basically zipping the two versions together), but I would rather use the first structure because it behaves slightly different.

--

I tried to merge both versions into one in the following manner:

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
        .flatMap(new Func1<Long, Observable<String>>() {
            @Override public Observable<String> call(Long aLong) {

                // some code // stays here to ensure there is no concurrency while executing

                final String result = "result"; // I store the result in a final variable after some code has been finished

                return Observable.create(new Observable.OnSubscribe<String>() {
                    @Override public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext(result); // then I use it in a new Observable and emit it
                        subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
                    }
                });

            }
        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
        .subscribe(new Action1<String>() {
            @Override public void call(String result) {
                // so I can finally consume the result on the UI thread
            }
        });

Instead of creating and emitting a "null" Observable, I create one which allows me to send a result to the subscriber.

Pretty messy, but this should work, right? Any simpler solution? What are your thoughts?

Upvotes: 0

Views: 5409

Answers (2)

Vattic
Vattic

Reputation: 311

I know it was since a while to add an answer to this question. However, it may help someone else. As i understand that you want to do create an observable and run it for example each 3 seconds. so please find my example below (You may emit a ready made operator like Observable.just or even create your own observable using Observable.create operator ):

Observable.interval(3,TimeUnit.SECONDS).flatMap(new Function<Long, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Long aLong) throws Exception {

            return Observable.just("Hello " + Thread.currentThread().getName()+aLong);
           /*
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    try{
                        String data = fetchData("http://www.yahoo.com");
                        emitter.onNext(data.length()+"");
                        emitter.onComplete();
                    }catch (Exception e){
                        emitter.onError(e);
                    }
                }
            });
            */
        }
    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxResult","--  " + Thread.currentThread().getName() + " -- " +s);
        }
    });

public String fetchData(String url) throws IOException {
    BufferedReader in = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
    String inputLine;
    String result = "";

    while((inputLine = in.readLine()) != null){
        result += inputLine;
    }

    return result.length()+"";
}

Upvotes: 0

dwursteisen
dwursteisen

Reputation: 11515

I'm not shure to understand your problem.

You want to use the same subscription from your Observable Timer and your other Observable ?

Maybe you can use a Subject to act as a bridge between your Observables.

Subject<?, ?> bridge = PublishSubject.create();
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */);
Observable.create(/* subscription */).subscribe(bridge);

Each item emit by your "One time Observable" will be pushed into the brige, that will push it into the "timer" observable.

Is that what you're looking for ?

Upvotes: 1

Related Questions