Reputation: 111
I'm trying to create group list based on certain conditions in Rxjava.
Below is my response:
{
"dates":[
{
"date":18,
"value":"1"
},
{
"date":18,
"value":"2"
},
{
"date":18,
"value":"3"
},
{
"date":19,
"value":"1"
},
{
"date":19,
"value":"2"
},
{
"date":19,
"value":"3"
},
{
"date":19,
"value":"4"
}
]
}
How can group by the values 18 [value 1, value 2, value 3, highestvalue= 3, lowestvalue = 1] 19[value 1, value 2, value 3, value 4,highestvalue= 4, lowestvalue = 1] using Rxjava
Note: I can create using for loop but the response will be fetched from the server and since it is returning observable thought of using rx java functionality.
Anyhelp will be really appreciated.
Thank you, Shanthi
Upvotes: 5
Views: 12288
Reputation: 163
Here is my solution, in the .subscribe()
call you are getting each group formed
List<DateModel> dateList = Arrays.asList(
new DateModel(18,"1"), //DateModel(int date, String value)
new DateModel(18,"2"),
new DateModel(18,"3"),
new DateModel(19,"1"),
new DateModel(19,"2"),
new DateModel(19,"3"),
new DateModel(19,"4")
);
Observable.fromIterable(dateList)
.groupBy(r -> r.getDate())
.flatMapSingle(Observable::toList) //.flatMapSingle(g -> g.toList())
.subscribe(group -> System.out.println("dateList size: "+group.size()));
System.out: dateList size: 3
dateList size: 4
Upvotes: 2
Reputation: 3830
Inspired by @Jon 's answer which works. Here's a full demo code for Rxjava2 and output.
Observable#collect()
for Observable
Flowable#parallel()
+ Single#blockingGet()
for Flowable
The output :
----------------------byCollect
[2017/11/16 20:42:43.548 CST][ 1 - main] - flatMapSingle : : 1
[2017/11/16 20:42:43.590 CST][ 1 - main] - flatMapSingle : : 2
[2017/11/16 20:42:43.591 CST][ 1 - main] - flatMapSingle : : 0
[2017/11/16 20:42:43.592 CST][ 1 - main] - subscribe : onNext : {0=[3, 6, 9]}
[2017/11/16 20:42:43.593 CST][ 1 - main] - subscribe : onNext : {1=[1, 4, 7]}
[2017/11/16 20:42:43.593 CST][ 1 - main] - subscribe : onNext : {2=[2, 5, 8]}
[2017/11/16 20:42:43.597 CST][ 1 - main] - subscribe : onComplete :
----------------------byParallelAndBlockingGet
[2017/11/16 20:42:43.629 CST][ 13 - RxComputationThreadPool-1] - flatMap : : 1
[2017/11/16 20:42:43.629 CST][ 15 - RxComputationThreadPool-3] - flatMap : : 0
[2017/11/16 20:42:43.629 CST][ 14 - RxComputationThreadPool-2] - flatMap : : 2
[2017/11/16 20:42:43.632 CST][ 15 - RxComputationThreadPool-3] - subscribe : onNext : {0=[3, 6, 9]}
[2017/11/16 20:42:43.632 CST][ 15 - RxComputationThreadPool-3] - subscribe : onNext : {1=[1, 4, 7]}
[2017/11/16 20:42:43.633 CST][ 15 - RxComputationThreadPool-3] - subscribe : onNext : {2=[2, 5, 8]}
[2017/11/16 20:42:43.633 CST][ 15 - RxComputationThreadPool-3] - subscribe : onComplete :
The source : Demo.java
import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.schedulers.*;
import java.time.*;
import java.time.format.*;
import java.util.*;
/**
* List<Integer> // [1..9]
* ->
* Map<Integer,List<Integer> // {0: [3,6,9], 1: [1,4,7], 2: [2,5,8] }
*/
public class Demo {
public static void main(String[] args) throws InterruptedException {
byCollect();
byParallelAndBlockingGet();
}
public static void byCollect() throws InterruptedException {
System.out.println("----------------------byCollect");
Observable.range(1, 9)
.groupBy(i -> i % 3)
.flatMapSingle(f -> { // GroupedObservable<Integer, List<Integer>>
// Look output : all runs on same thread,
print("flatMapSingle : ", f.getKey());
// "onComplete" has not been triggered.
// blockingGet will block current thread.
//return Observable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet()))
return f.collect(
// (Callable<Map<Integer, List<Integer>>>)
() -> Collections.singletonMap(f.getKey(), new ArrayList<Integer>()),
// (BiConsumer<Map<Integer, List<Integer>>, Integer>)
(m, i) -> m.get(f.getKey()).add(i)
);
})
.subscribe(
i -> print("subscribe : onNext", i),
err -> print("subscribe : onError", err),
() -> print("subscribe : onComplete", "")
)
;
}
public static void byParallelAndBlockingGet() throws InterruptedException {
System.out.println("----------------------byParallelAndBlockingGet");
Flowable.range(1, 9)
.groupBy(i -> i % 3)
.parallel() // There's no `parallel` method on `Observable` class
.runOn(Schedulers.computation()) // Important!!!
.flatMap(f -> { // ParallelFlowable<GroupedFlowable<Integer, List<Integer>>
// Look output : runs on different thread each.
print("flatMap : ", f.getKey());
return Flowable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet()));
})
.sequential()
.subscribe(
i -> print("subscribe : onNext", i),
err -> print("subscribe : onError", err),
() -> print("subscribe : onComplete", "")
)
;
Thread.sleep(500);
}
public static void print(String step, Object data) {
ZonedDateTime zdt = ZonedDateTime.now();
String now = zdt.format(DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS z"));
System.out.printf("[%s][%4d - %30s] - %10s : %s%n",
now,
Thread.currentThread().getId(),
Thread.currentThread().getName(),
step,
data
);
}
}
Upvotes: 4
Reputation: 1763
Look into group by functionality.
Here's the example for anyone who's curious:
class DateModel implements Comparable<DateModel>{
Integer date;
Integer value;
public DateModel(int date, int value){
this.date = date;
this.value = value;
}
@Override
public int compareTo(DateModel o) {
return value.compareTo(o.value);
}
}
And then if we have to aggregate a list of these model objects:
// example list
List<DateModel> dateList = Arrays.asList(
new DateModel(18,1),
new DateModel(18,2),
new DateModel(18,3),
new DateModel(19,1),
new DateModel(19,2),
new DateModel(19,3),
new DateModel(19,4)
);
// the following observable will give you an emission for every grouping
// for the example data above, you should get two emissions (group 18 and 19)
Observable<PriorityQueue<DateModel>> observable =
Observable.from(dateList)
.groupBy(dateModel -> dateModel.date)
.flatMap(groups -> groups.collect(PriorityQueue::new, PriorityQueue::add));
PriorityQueue
was just an example of the structure used for collecting. If you pop from queue, you'll get 18-1, 18-2, 18-3 etc (in the order you asked). You can use a different structure for the purposes of only finding the max & min.
Upvotes: 9
Reputation: 8190
This can easily be retrieved as below:
List<Date> list = Arrays.asList(new Date[]{
new Date(18, 1), new Date(18, 2), new Date(18, 3), new Date(19, 1), new Date(19, 2)
});
Observable
.fromArray(list)
.map(new Function<List<Date>, List<Date>>() {
@Override
public List<Date> apply(@NonNull List<Date> dates) throws Exception {
TreeMap<Integer, List<Date>> treeMap = new TreeMap<Integer, List<Date>>();
for (Date date : dates) {
List<Date> storedDates = treeMap.get(date.date);
if (storedDates == null) {
storedDates = new ArrayList<Date>();
treeMap.put(date.date, storedDates);
}
storedDates.add(date);
}
List<Date> result = new ArrayList<Date>();
for (Integer integer : treeMap.keySet()) {
result.addAll(treeMap.get(integer));
}
return result;
}
});
Upvotes: 1
Reputation: 111
Thank you for your reply.
but I was able to solve it using the below code.
Map<String, List<Date>> grouped = dates.body().getDate()
.stream()
.collect(Collectors.groupingBy(x -> {
return x.getTime().getMday(); // we can use any logic here
}));
Upvotes: -1