niknetniko
niknetniko

Reputation: 134

Update and retain existing data with RxJava

I'm looking into converting some logic in my (Android) app to use RxJava, however I'm struggling to come up with a way to do some more advanced logic.

The use case is as follows: I want to show a kind of Feed to the user. The Feed consists of items from different sources, e.g. messages, articles, etc. Due to API restrictions, the app itself must gather the individual sources and show them in one list.

For example, assume my items in the Feed are as follows:

class FeedItem {
     Type feedItem; // Type of the item, e.g. article, message, etc.
     ...
}

Currently, the feed is built on a separate thread, and the UI is notified using a listener when the feed has been updated. To give you an idea of how it's done, below is some (pseudo) Java code (threading and other administrative code is omitted for clarity).

class FeedProducer {
    List<FeedItem> currentData = new ArrayList();

    public void refreshData() {
        for (FeedSource source: getFeedSources()) {
            Type sourceType = source.getType();
            // Remove existing items
            currentData.removeIf(item -> item.feedItem.equals(sourceType));
            List<FeedItem> newItems = source.produceItems();
            // Add the new items
            currentData.addAll(newItems);
            // Notify the UI things have changed
            notifyDataChanged(currentData);
        }
        // Notify the UI we are done loading
        notifyLoadingComplete();
    }
}

This method refreshData() will be called each time the user wants to refresh the data. This way, it's possible to only update some sources, while others stay the same (e.g. by changing the return value of getFeedSources()).

These sources are also used individually in other parts of the app; I have converted them to Observables there. This made things a lot easier, e.g. if the database changes, the changes are simple pushed to the UI by the Observable.

My question is thus how can I (elegantly) merge these Observable sources into one Observable, but where there is a "global" state of previous results. I have looked into the various combining operators, but have not found what I need. My apologies if I am overlooking something obvious, as I am fairly new to RxJava.

Upvotes: 1

Views: 1755

Answers (2)

Jemshit
Jemshit

Reputation: 10038

Naive option would be, caller storing last list and giving it as parameter when you are requesting new data:

public class ReactiveMultipleSources {

    // region Classes
    public enum SourceType {
        TYPE_ARTICLE,
        TYPE_MESSAGE,
        TYPE_VIDEO
    }

    public static class Feed {
        private SourceType sourceType;
        private String content;

        Feed(SourceType sourceType, String content) {
            this.sourceType = sourceType;
            this.content = content;
        }

        SourceType getSourceType() {
            return sourceType;
        }
    }
    // endregion

    public static void main(String[] args) throws InterruptedException {
        final List<Feed>[] currentList = new List[]{new ArrayList()};

        // Simulate refresh
        refreshContent(currentList[0])
                .subscribe(feeds -> {
                    currentList[0] = feeds;

                    for (int i = 0; i < currentList[0].size(); i++) {
                        System.out.println(currentList[0].get(i).content);
                    }
                });
        Thread.sleep(2000);
        System.out.println();

        // Simulate refresh
        refreshContent(currentList[0])
                .subscribe(feeds -> {
                    currentList[0] = feeds;

                    for (int i = 0; i < currentList[0].size(); i++) {
                        System.out.println(currentList[0].get(i).content);
                    }
                });
        Thread.sleep(2000);


    }

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) {
        return Observable.fromIterable(getSourceTypes())
                .observeOn(Schedulers.io())
                // Get List<Feed> forEach sourceType
                .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType)
                .observeOn(Schedulers.computation())
                // Get list of "List of Feed for sourceType", =  List<List<Feed>>
                .toList()
                .map(lists -> {
                    for (List<Feed> list : lists) {
                        SourceType sourceType = list.get(0).getSourceType();
                        // Remove items of currentFeed whose sourceType has new List<Feed>
                        currentFeed.removeIf(temp -> temp.getSourceType() == sourceType);
                        // Add new items
                        currentFeed.addAll(list);
                    }
                    return currentFeed;
                })
                .toObservable();
    }

    // region Helper
    private static List<SourceType> getSourceTypes() {
        return new ArrayList<>(Arrays.asList(SourceType.values()));
    }

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) {
        String content;
        if (sourceType == SourceType.TYPE_ARTICLE)
            content = "article ";
        else if (sourceType == SourceType.TYPE_MESSAGE)
            content = "message ";
        else if (sourceType == SourceType.TYPE_VIDEO)
            content = "video ";
        else
            content = "article ";

        Feed feed1 = new Feed(sourceType, content + createRandomInt());
        Feed feed2 = new Feed(sourceType, content + createRandomInt());
        Feed feed3 = new Feed(sourceType, content + createRandomInt());
        Feed feed4 = new Feed(sourceType, content + createRandomInt());

        return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4));
    }

    // For simulating different items each time List<Feed> is required
    private static int createRandomInt() {
        return ThreadLocalRandom.current().nextInt(0, 21);
    }
    // endregion
}

Example output:

article 19
article 15
article 18
article 18
message 3
message 2
message 9
message 1
video 19
video 17
video 18
video 11

article 0
article 4
article 18
article 15
message 11
message 16
message 16
message 4
video 1
video 7
video 20
video 2

Upvotes: 1

fashare2015
fashare2015

Reputation: 101

If you have 3 individual tasks which return [0, 1], [10, 11] and [20, 21], you want to merge them into a single list. In this case, you can use the zip operation.

public class TestRx {
    public static void main(String[] args) {
        // some individual observables.
        Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1));
        Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11));
        Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21));

        Observable.zip(observable1, observable2, observable3,
                new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() {
                    @Override
                    public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
                        // TODO: Remove existing items

                        // merge all lists
                        List<Integer> mergedList = new ArrayList<>();
                        mergedList.addAll(list1);
                        mergedList.addAll(list2);
                        mergedList.addAll(list3);
                        return mergedList;
                    }
                })
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onNext(List<Integer> mergedList) {
                        System.out.println(mergedList);
                        // TODO: notifyDataChanged(mergedList)
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.out.println(throwable.toString());
                        // TODO: handle exceptions
                    }

                    @Override
                    public void onCompleted() {
                        // TODO: notifyLoadingComplete()
                    }
                });
    }
}

As a result, it print like this [0, 1, 10, 11, 20, 21].

Upvotes: 1

Related Questions