theanilpaudel
theanilpaudel

Reputation: 3488

How to use one rxjava observable after previous is completed?

I've two RxJava Observables and I get an arraylist from first observable and then use it to get data from another observable like this.

Observable<KarobarTvVod> observable1 = youtubeDataHelper.getTVData();
    observable1.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(Schedulers.io())
            .subscribe(new Subscriber<KarobarTvVod>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onNext(KarobarTvVod karobarTvVod) {
                    Log.d(TAG, "onNext: size" + karobarTvVod.getEtag());
                    tvObjArrayList = new ArrayList<TVObj>();
                    for (int i = 0; i < karobarTvVod.getItems().size(); i++) {
                        TVObj tvObj = new TVObj();
                        tvObj.setVideoDate(karobarTvVod.getItems().get(i).getSnippet().getPublishedAt());
                        tvObj.setVideoIcon(karobarTvVod.getItems().get(i).getSnippet().getThumbnails().getHigh().getUrl());
                        tvObj.setVideoTitle(karobarTvVod.getItems().get(i).getSnippet().getTitle());
                        tvObj.setVideoID(karobarTvVod.getItems().get(i).getId().getVideoId());
                        tvObjArrayList.add(tvObj);
                    }


                }
            });



    Observable<YoutubeViews> observable2 = youtubeDataHelper.getTVDataViews(tvObjArrayList);
    observable2.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(Schedulers.io())
            .subscribe(new Subscriber<YoutubeViews>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: in 2nd obs");
                    e.printStackTrace();
                }

                @Override
                public void onNext(YoutubeViews youtubeViews) {
                    Log.d(TAG, "onNext: views" + youtubeViews.getEtag());
                    viewsList = new ArrayList<String>();
                    for (int i = 0; i < youtubeViews.getItems().size(); i++) {

                        viewsList.add(youtubeViews.getItems().get(i).getStatistics().getViewCount());
                    }
                    tvView.displayList(tvObjArrayList, viewsList);
                }
            });

This is just sample code, I need to pass the tvObjArrayList when it gets populated from 1st Observable to 2nd Observable, what is the best practice to do so ? And also I'm using for-loop inside the 1st Observable, is there a better way to achieve it using rxjava ? Thanks

Upvotes: 10

Views: 5684

Answers (4)

MyDogTom
MyDogTom

Reputation: 4606

I assume that getTVData and getTVDataViews each emits one item and than calls onComplete. If it true, than the following example works. No loop's, just pure rx :)

//getTVData should emit one item and then call obComplete
//otherwise toList() will wait forever
service.getTVData()
        .flatMap(karobarTvVod -> Observable.from(karobarTvVod.getItems()))
        .map(item -> {
            TVObj tvObj = new TVObj();
            //set other fields
            //by the way, I recommend you to use immutable objects
            return tvObj;
        })
        .toList()
        //here we have List<TVObj>
        .flatMap(
                objs -> {
                    //getTVDataViews should emit one item and then call onComplete
                    //otherwise toList will wait forever
                    return service.getTVDataViews(objs)
                            .flatMap(youtubeViews -> Observable.from(youtubeViews.getItems()))
                            .map(integer -> integer.toString())
                            //after that we will have List<String>
                            .toList();
                },
                //a function that combines one item emitted by each of the source and collection Observables 
                // and returns an item to be emitted by the resulting Observable
                new Func2<List<TVObj>,List<String>,Pair<List<TVObj>,List<String>>>() {
                    @Override
                    public Pair<List<TVObj>, List<String>> call(List<TVObj> objs, List<String> strings) {
                        return new Pair(objs, strings);
                    }
                })
        .subscribe(pair -> tvView.displayList(pair.first, pair.second));

PS. While this approach is more concise, I believe that loop for creating list of items is more efficient.

Upvotes: 1

thuongle
thuongle

Reputation: 537

You can use operator

toList().flatMap()

for Observable A, and in flatMap function, do work for Observable B.

For example:

observableA
    .toList()
    .flatMap(observableB.subscribe())
    .subscribe()

Upvotes: 1

Bartek Lipinski
Bartek Lipinski

Reputation: 31468

You should use flatMap operator. It won't get much easier than that.

Observable<KarobarTvVod> observable1 = youtubeDataHelper.getTVData();
observable1.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .unsubscribeOn(Schedulers.io())
        .flatMap(new Func1<KarobarTvVod, Observable<YoutubeViews>>() {
            @Override
            public Observable<YoutubeViews> call(KarobarTvVod karobarTvVod) {
                Log.d(TAG, "onNext: size" + karobarTvVod.getEtag());
                tvObjArrayList = new ArrayList<TVObj>();
                for (int i = 0; i < karobarTvVod.getItems().size(); i++) {
                    TVObj tvObj = new TVObj();
                    tvObj.setVideoDate(karobarTvVod.getItems().get(i).getSnippet().getPublishedAt());
                    tvObj.setVideoIcon(karobarTvVod.getItems().get(i).getSnippet().getThumbnails().getHigh().getUrl());
                    tvObj.setVideoTitle(karobarTvVod.getItems().get(i).getSnippet().getTitle());
                    tvObj.setVideoID(karobarTvVod.getItems().get(i).getId().getVideoId());
                    tvObjArrayList.add(tvObj);
                }
                return youtubeDataHelper.getTVDataViews(tvObjArrayList);
            }
        }).subscribe(new Subscriber<YoutubeViews>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: in 1st or 2nd obs");
                e.printStackTrace();
            }

            @Override
            public void onNext(YoutubeViews youtubeViews) {
                Log.d(TAG, "onNext: views" + youtubeViews.getEtag());
                viewsList = new ArrayList<String>();
                for (int i = 0; i < youtubeViews.getItems().size(); i++) {

                    viewsList.add(youtubeViews.getItems().get(i).getStatistics().getViewCount());
                }
                tvView.displayList(tvObjArrayList, viewsList);
            }
        });

Upvotes: 5

paul
paul

Reputation: 13481

You need to subscribe the second observable in the onComplete of the first one

    Observable<KarobarTvVod> observable1 = youtubeDataHelper.getTVData();
    Observable<YoutubeViews> observable2 = youtubeDataHelper.getTVDataViews(tvObjArrayList);

    observable1.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .unsubscribeOn(Schedulers.io())
        .subscribe(new Subscriber<KarobarTvVod>() {
            @Override
            public void onCompleted() {
                 observable2.subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .unsubscribeOn(Schedulers.io())
           .subscribe(new Subscriber<YoutubeViews>() {
                   @Override
                   public void onCompleted() {

                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: in 2nd obs");
                       e.printStackTrace();
                   }

                   @Override
                   public void onNext(YoutubeViews youtubeViews) {
                       Log.d(TAG, "onNext: views" + youtubeViews.getEtag());
                       viewsList = new ArrayList<String>();
                       for (int i = 0; i < youtubeViews.getItems().size(); i++) {

                           viewsList.add(youtubeViews.getItems().get(i).getStatistics().getViewCount       ());
                       }
                       tvView.displayList(tvObjArrayList, viewsList);
                   }
               });
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(KarobarTvVod karobarTvVod) {
                Log.d(TAG, "onNext: size" + karobarTvVod.getEtag());
                tvObjArrayList = new ArrayList<TVObj>();
                for (int i = 0; i < karobarTvVod.getItems().size(); i++) {
                    TVObj tvObj = new TVObj();
                    tvObj.setVideoDate(karobarTvVod.getItems().get(i).getSnippet().getPublishedAt());
                    tvObj.setVideoIcon(karobarTvVod.getItems().get(i).getSnippet().getThumbnails().getHigh().getUrl());
                    tvObj.setVideoTitle(karobarTvVod.getItems().get(i).getSnippet().getTitle());
                    tvObj.setVideoID(karobarTvVod.getItems().get(i).getId().getVideoId());
                    tvObjArrayList.add(tvObj);
                }


            }
        });

Of course to make this code more readable I would use a Consumer function for the observable2.subsriberOn on the onComplete method

Upvotes: 0

Related Questions