Reputation: 65
I am quite new on Spark Streaming and I am getting stuck trying to figure out how to handle this problem since I found a lot of examples for single (K,V) pairs but anything further. I would appreciate some help in order to find the best approach using Spark's transformations with Java.
Let me describe briefly the scenario,
The goal is to obtain the error ratio of a set of elements within a time window.
Given the following input,
(A, Error)
(B, Success)
(B, Error)
(B, Success)
(C, Success)
(C, Error)
It is going to aggregate by element and then the status (Element, (Number of Success, Number of Error))
. On this case the result of the transformation would be,
(A, (0,1))
(B, (2,1))
(C, (1,1))
And finally a ratio computation using a function such as (i1,i2) -> i1/(i1+i2).
(A, 100%)
(B, 33.3%)
(C, 50%)
As far as I understand, the result would be given by reduceByKeyAndWindow() function, for example,
JavaPairDStream<String, Double> res =
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1));
Following the reverse flow of the application, my questions are,
How to define a pair on a JavaPairDStream with more than one value or key (maybe something like JavaPairDStream<String, Tuple2<Integer,Integer>>
)?
Which is the best approach for the reduceFunc
given a pair with multiple keys?
Which is the best way to map the initial DStream (maybe something like JavaDStream<Tuple2<String, String>> line = input.map(func)
)?
Thank you in advance for your help.
Upvotes: 1
Views: 876
Reputation: 65
I have already found the solution. Working with function classes and tuples it is possible to find any combination that you would build with Scala. The problem is that I did not find any documentation or examples related with this in Java. Below you will find my solution in case it can help anyone in the future.
JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() {
public Iterator<Tuple2<String,String>> call(String s) throws Exception {
return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator();
}
});
JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() {
public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) {
return new Tuple2<Tuple2<String,String>, Integer>(t, 1);
}
});
JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
{
return (t._1._2.equals("Error"));
}}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
return new Tuple2(t._1._1,t._2);
}
}).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}}, Durations.seconds(30), Durations.seconds(1));
JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
{
return (t._1._2.equals("Success"));
}}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
return new Tuple2(t._1._1,t._2);
}
}).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}}, Durations.seconds(30), Durations.seconds(1));
JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors);
JavaPairDStream<String, Double> mappedRDD = countPairs
.mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() {
public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) {
return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get() /
((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get())));
} else if (stringTuple2Tuple2._2()._2().isPresent()){
return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0);
} else {
return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0);
}
}
});
Upvotes: 3