Reputation: 799
I have a Spark Streaming application which receives several JSON messages per second, each of these having an ID which identifies their source.
Using this ID as a key, I am able to perform a MapPartitionsToPair
, thus creating a JavaPairDStream, with an RDD of key/value pairs, one key value pair per partition (so if I received 5 JSON messages for example, I get an RDD with 5 partitions, each with the ID of the message as a key, and the JSON message itself as the value).
What I want to do now, is I want to group all values that have the same key into the same partition. So for example, if I have 3 partitions with key 'a' and 2 partitions with key 'b', I want to create a new RDD with 2 partitions instead of 5, each partition containing all the values that one key has, one for'a' and one for 'b'.
How can I accomplish this? This is my code so far:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
//I create a JavaPairDStream in which each partition contains one key/value pair.
I tried to use grouByKey()
, but no matter what the number of messages were, I always got a partition number of 2.
How should I do this? Thank you so much.
Upvotes: 4
Views: 6176
Reputation: 63022
You can use the
groupByKey(Integer numPartitions)
and set the numPartitions
equal to the number of distinct keys you have.
But .. you will need to know how many distinct keys do you have up front. Do you have that information? Probably not. So then .. you would need to do some extra (/redundant) work. E.g. use
countByKey
as the first step. That is faster than groupByKey - so at least you were not doubling the total processing time.
Update The OP asked about why they are getting 2 partitions by default.
The default groupByKey
uses a defaultPartitioner()
method
groupByKey(defaultPartitioner(self))
Partitioner
from the parent partition with the largest cardinality.-- or it will use spark.default.parallelism
Upvotes: 5