Reputation: 49
So far I have a JavaDStream which first looked like this:
Value
---------------------
a,apple,spain
b,orange,italy
c,apple,italy
a,apple,italy
a,orange,greece
First i splitted up the rows and mapped it to a Key-Value pair in a JavaPairDStream:
JavaPairDStream<String, String> pairDStream = inputStream.mapToPair(row -> {
String[] cols = row.split(",");
String key = cols[0];
String value = cols[1] + "," + cols[2];
return new Tuple2<String, String>(key, value);
});
So that i got this:
Key | Value
---------------------
a | apple,spain
b | orange,italy
c | apple,italy
a | apple,italy
a | orange,greece
In the end, the output should look like this
Key | Fruit | Country
-------------------------------
a | 2 | 3
b | 1 | 1
c | 1 | 1
which counts the number of unique fruits and countries of each key.
What is now the best practice? First, groupByKey/reduceByKey then split again? or is it possible to have two values for each key in a Key-Value pair like this?:
Key | Value1 | Value2
----------------------
a | apple | spain
b | orange | italy
c | apple | italy
a | apple | italy
a | orange | greece
Upvotes: 0
Views: 216
Reputation: 3554
There is no way to get distinct values with JavaPairDStream so you need to use its .transformToPair(...)
method in order to convert it into JavaPairRDD first, then get distinct rows, then reduce by key and finally convert it back to JavaPairDStream
-use map to make JavaPairDStream for fruits: <key, fruit>
, then apply .distinct( ).reduceByKey( )
inside .transformToPair(...)
to get JavaPairDStream with <key, distinct fruit count>
(let's call it <prds1>
)
-use map to make JavaPairDStream for countries: <key, country>
, then apply .distinct( ).reduceByKey( )
inside .transformToPair(...)
to get JavaPairDStream with <key, distinct country count>
(let's call it <prds2>
)
-join both by key: <key, distinct fruit count, distinct country count>
: (apply prds1.join(prds2)
)
For future reference, in case if you wish to do the same but using Spark's Dataframe class:
-make a single dataframe out of given input data (assuming it has 3 columns referred as <key, fruit, country>
(call it
df
)
-select keys and fruits, apply distinct and then group by key: df.select("key", "fruit").distinct( ).groupBy("key").sum("fruit")
(call resulting dataframe df1
)
-select keys and countries, apply distinct and then group by key: df.select("key", "country").distinct( ).groupBy("key").sum("country")
(call resulting dataframe df2
)
-join df1 and df2 by key: df1.join(df2, col("key").equalTo(col("key")), "inner")
Upvotes: 0