Reputation: 1511
Given a Spark DataFrame which looks something like this:
==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
| A | 1 | 11 | .. | 21 |
| A | 31 | 41 | .. | 51 |
| B | 2 | 12 | .. | 22 |
| B | 32 | 42 | .. | 52 |
==================================
I would like to run logic which carries out an aggregation/computation for a partition of the table which corresponds to a particular Name
value. Said logic requires that the full contents of the partition -- and only that partition -- be materialized in memory on the node executing the logic; it would look something like the processSegment
function below:
def processDataMatrix(dataMatrix):
# do some number crunching on a 2-D matrix
def processSegment(dataIter):
# "running" value of the Name column in the iterator
dataName = None
# as the iterator is processed, put the data in a matrix
dataMatrix = []
for dataTuple in dataIter:
# separate the name column from the other columns
(name, *values) = dataTuple
# SANITY CHECK: ensure that all rows have same name
if (dataName is None):
dataName = name
else:
assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
# put the row in the matrix
dataMatrix.append(values)
# if any rows were processed, number-crunch the matrix
if (dataName is not None):
return processDataMatrix(dataMatrix)
else:
return []
I have tried to make this work by repartitioning based on the Name
column, then running processSegment
on each partition via mapPartitions
on the underlying RDD:
result = \
stacksDF \
.repartition('Name') \
.rdd \
.mapPartitions(processSegment) \
.collect()
However, the process routinely fails the SANITY CHECK
assertion in processSegment
:
AssertionError: row name Q7 does not match expected A9
Why is the partitioning ostensibly executed on the DataFrame not being preserved when I attempt to run mapPartitions
on the underlying RDD? If the approach above is not valid, is there some approach (using either the DataFrame API or the RDD API) which will enable me to carry out aggregation logic on the in-memory rendition of a DataFrame partition?
(As I am using PySpark, and the particular number-crunching logic I wish to execute is Python, user-defined aggregation functions (UDAFs) would not appear to be an option.)
Upvotes: 2
Views: 2189
Reputation: 330413
I believe that you misunderstood how partitioning works. In general partioner is a surjective function, not a bijective one. While all records for a specific value will be moved to a single partition, partition may contain records with multiple different values.
DataFrame
API doesn't give you any control over partitioner, but it is possible to define custom partitionFunc
when using RDD
API. It means you can use one which is bijective, for example:
mapping = (df
.select("Name")
.distinct()
.rdd.flatMap(lambda x: x)
.zipWithIndex()
.collectAsMap())
def partitioner(x):
return mapping[x]
and use it as follows:
df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner)
Although it is possible you have to remember that partitions are not free and if number of unique values is large it can become a serious performance issue.
Upvotes: 2