littlely
littlely

Reputation: 1418

What's the difference between partitionBy and groupBy in spark

I have a pyspark rdd which can collect as a tuple list like below:

rdds = self.sc.parallelize([(("good", "spark"), 1), (("sood", "hpark"), 1), (("god", "spak"), 1),
                                (("food", "spark"), 1), (("fggood", "ssspark"), 1), (("xd", "hk"), 1),
                                (("good", "spark"), 7), (("good", "spark"), 3), (("good", "spark"), 4),
                                (("sood", "hpark"), 5), (("sood", "hpark"), 7), (("xd", "hk"), 2),
                                (("xd", "hk"), 1), (("fggood", "ssspark"), 2), (("fggood", "ssspark"), 1)], 6)
rdds.glom().collect()

def inner_map_1(p):
    d = defaultdict(int)
    for row in p:
        d[row[0]] += row[1]
    for item in d.items():
        yield item

rdd2 = rdds.partitionBy(4, partitionFunc=lambda x: hash(x)).mapPartitions(inner_map_1)
print(rdd2.glom().collect())

def inner_map_2(p):
    for row in p:
        item = row[0]
        sums = sum([num for _, num in row[1]])
        yield item, sums
rdd3 = rdds.groupBy(lambda x: x[0]).mapPartitions(inner_map_2)
print(rdd3.glom().collect())

There is rdd2 and rdd3 which are calculated in different form, and I get the same result, but I'm not sure is it true that rdd2 and rdd3 get the same result and the elements are in the same partition.

Upvotes: 3

Views: 10673

Answers (2)

Andrew Long
Andrew Long

Reputation: 933

partitionBy generally means you are you going hash the partition keys and send them to a particular partition of an RDD. This colocates anything with a matching key into the same partition which is useful when doing Joins where you need all matching keys in the same place. partitionBy does NOT discard any records, it only colocates matching keys.

df.partitionBy("vendorId") // all rows kept they are now colocated in the same rdd partition

groupBy is a SQL concept. If finds all unique key combinations of the key. You can also do aggregate functions on all records with the same key. for example if you wanted to count all records with the same key you could do...

df.groupBy("vendorId").count.show

Which would count all records with the same vendor ID. Unlike partitionBy, groupBy tends to greatly reduce the number of records. (see cardinality)

I'd suggest running df.groupBy("vendorId").explain(true). this will print out logical plan (think the SQL equivelent) and the physical plan (the exact set of operations that spark will do). In general spark translates groupBy into partial hash aggregate -> shuffle(partition by key) -> final hash aggregate –> results

Upvotes: 5

Nastasia
Nastasia

Reputation: 657

I would say that "groupBy" is more a logical way to group your data. It looks like a "groupBy" in SQL.

"PartitionBy" is more physical. You really physically partition your data in the cluster.

Upvotes: 2

Related Questions