Reputation: 134
I'm looking into converting some logic in my (Android) app to use RxJava, however I'm struggling to come up with a way to do some more advanced logic.
The use case is as follows: I want to show a kind of Feed to the user. The Feed consists of items from different sources, e.g. messages, articles, etc. Due to API restrictions, the app itself must gather the individual sources and show them in one list.
For example, assume my items in the Feed are as follows:
class FeedItem {
Type feedItem; // Type of the item, e.g. article, message, etc.
...
}
Currently, the feed is built on a separate thread, and the UI is notified using a listener when the feed has been updated. To give you an idea of how it's done, below is some (pseudo) Java code (threading and other administrative code is omitted for clarity).
class FeedProducer {
List<FeedItem> currentData = new ArrayList();
public void refreshData() {
for (FeedSource source: getFeedSources()) {
Type sourceType = source.getType();
// Remove existing items
currentData.removeIf(item -> item.feedItem.equals(sourceType));
List<FeedItem> newItems = source.produceItems();
// Add the new items
currentData.addAll(newItems);
// Notify the UI things have changed
notifyDataChanged(currentData);
}
// Notify the UI we are done loading
notifyLoadingComplete();
}
}
This method refreshData()
will be called each time the user wants to refresh the data. This way, it's possible to only update some sources, while others stay the same (e.g. by changing the return value of getFeedSources()
).
These sources are also used individually in other parts of the app; I have converted them to Observables there. This made things a lot easier, e.g. if the database changes, the changes are simple pushed to the UI by the Observable.
My question is thus how can I (elegantly) merge these Observable sources into one Observable, but where there is a "global" state of previous results. I have looked into the various combining operators, but have not found what I need. My apologies if I am overlooking something obvious, as I am fairly new to RxJava.
Upvotes: 1
Views: 1755
Reputation: 10038
Naive option would be, caller storing last list and giving it as parameter when you are requesting new data:
public class ReactiveMultipleSources {
// region Classes
public enum SourceType {
TYPE_ARTICLE,
TYPE_MESSAGE,
TYPE_VIDEO
}
public static class Feed {
private SourceType sourceType;
private String content;
Feed(SourceType sourceType, String content) {
this.sourceType = sourceType;
this.content = content;
}
SourceType getSourceType() {
return sourceType;
}
}
// endregion
public static void main(String[] args) throws InterruptedException {
final List<Feed>[] currentList = new List[]{new ArrayList()};
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
System.out.println();
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
}
private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) {
return Observable.fromIterable(getSourceTypes())
.observeOn(Schedulers.io())
// Get List<Feed> forEach sourceType
.concatMap(ReactiveMultipleSources::getFeedItemsBySourceType)
.observeOn(Schedulers.computation())
// Get list of "List of Feed for sourceType", = List<List<Feed>>
.toList()
.map(lists -> {
for (List<Feed> list : lists) {
SourceType sourceType = list.get(0).getSourceType();
// Remove items of currentFeed whose sourceType has new List<Feed>
currentFeed.removeIf(temp -> temp.getSourceType() == sourceType);
// Add new items
currentFeed.addAll(list);
}
return currentFeed;
})
.toObservable();
}
// region Helper
private static List<SourceType> getSourceTypes() {
return new ArrayList<>(Arrays.asList(SourceType.values()));
}
private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) {
String content;
if (sourceType == SourceType.TYPE_ARTICLE)
content = "article ";
else if (sourceType == SourceType.TYPE_MESSAGE)
content = "message ";
else if (sourceType == SourceType.TYPE_VIDEO)
content = "video ";
else
content = "article ";
Feed feed1 = new Feed(sourceType, content + createRandomInt());
Feed feed2 = new Feed(sourceType, content + createRandomInt());
Feed feed3 = new Feed(sourceType, content + createRandomInt());
Feed feed4 = new Feed(sourceType, content + createRandomInt());
return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4));
}
// For simulating different items each time List<Feed> is required
private static int createRandomInt() {
return ThreadLocalRandom.current().nextInt(0, 21);
}
// endregion
}
Example output:
article 19
article 15
article 18
article 18
message 3
message 2
message 9
message 1
video 19
video 17
video 18
video 11
article 0
article 4
article 18
article 15
message 11
message 16
message 16
message 4
video 1
video 7
video 20
video 2
Upvotes: 1
Reputation: 101
If you have 3 individual tasks which return [0, 1]
, [10, 11]
and [20, 21]
, you want to merge them into a single list. In this case, you can use the zip
operation.
public class TestRx {
public static void main(String[] args) {
// some individual observables.
Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1));
Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11));
Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21));
Observable.zip(observable1, observable2, observable3,
new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
// TODO: Remove existing items
// merge all lists
List<Integer> mergedList = new ArrayList<>();
mergedList.addAll(list1);
mergedList.addAll(list2);
mergedList.addAll(list3);
return mergedList;
}
})
.subscribe(new Observer<List<Integer>>() {
@Override
public void onNext(List<Integer> mergedList) {
System.out.println(mergedList);
// TODO: notifyDataChanged(mergedList)
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.toString());
// TODO: handle exceptions
}
@Override
public void onCompleted() {
// TODO: notifyLoadingComplete()
}
});
}
}
As a result, it print like this [0, 1, 10, 11, 20, 21]
.
Upvotes: 1