manuel mourato
manuel mourato

Reputation: 799

How to group key/values by partition in Spark?

I have a Spark Streaming application which receives several JSON messages per second, each of these having an ID which identifies their source.

Using this ID as a key, I am able to perform a MapPartitionsToPair, thus creating a JavaPairDStream, with an RDD of key/value pairs, one key value pair per partition (so if I received 5 JSON messages for example, I get an RDD with 5 partitions, each with the ID of the message as a key, and the JSON message itself as the value).

What I want to do now, is I want to group all values that have the same key into the same partition. So for example, if I have 3 partitions with key 'a' and 2 partitions with key 'b', I want to create a new RDD with 2 partitions instead of 5, each partition containing all the values that one key has, one for'a' and one for 'b'.

How can I accomplish this? This is my code so far:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

            ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

            while (stringIterator.hasNext()){
                String c=stringIterator.next();
                if(c==null){
                    return null;

                }

                JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
                String key= retMap.getSid();
                Tuple2<String,String> b= new Tuple2<String,String>(key,c);
                a.add(b);

                System.out.print(b._1+"_"+b._2);
                // }
                //break;
            }


            return a;
        }
    });

//I create a JavaPairDStream in which each partition contains one key/value pair.

I tried to use grouByKey(), but no matter what the number of messages were, I always got a partition number of 2.

How should I do this? Thank you so much.

Upvotes: 4

Views: 6176

Answers (1)

WestCoastProjects
WestCoastProjects

Reputation: 63022

You can use the

groupByKey(Integer numPartitions)

and set the numPartitions equal to the number of distinct keys you have.

But .. you will need to know how many distinct keys do you have up front. Do you have that information? Probably not. So then .. you would need to do some extra (/redundant) work. E.g. use

countByKey

as the first step. That is faster than groupByKey - so at least you were not doubling the total processing time.

Update The OP asked about why they are getting 2 partitions by default.

The default groupByKey uses a defaultPartitioner() method

groupByKey(defaultPartitioner(self))
  • which selects the Partitioner from the parent partition with the largest cardinality.

-- or it will use spark.default.parallelism

Upvotes: 5

Related Questions