Project707
Project707

Reputation: 502

Spark: Manipulate all of a specific RDD or DataFrame partition's data

I've found several posts, articles, references in the documentation, etc that hint at the idea that you can access a specific partition using foreachPartition. However I have yet to figure out how to do something with all of the data in a given partition.

My goal is to select some data from a database, manipulate it, partition by the unique values in a column, and then write each of these partitions as a single specifically-named jsonl file to s3 to be accessed by another system.

repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)

I have tried many ways to parse that data, but it seems that I can only get individual tuples in foreachPartition, and no bounds for the partitions themselves in order to separate this data out efficiently.

def writePartitionsToS3(partition):
    for row in partition:
        pprint (row)

produces (with several columns removed for brevity):

Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25)) Row(entity_id=u'2315183', ... processed_date=datetime.date(2015, 3, 25))

It's also possible that partitions are not defined the way that I think they are, but I know that there is a built in DataFrameWriter that can write by partition, but I can't use it. I really need to be able to produce named files like this, rather than the part-xxx format:

s3a://<bucket>/<prefix>/<date processed>.jsonl

I am chunking out data in such a way that the size of the partitions will be relatively small (one per processed_date, per entity selected as it's own DataFrame), so that's not an issue, but I also don't really want to collect() everything on one node to parse a list of partitions, as I want to write files to s3 in parallel.


Update:

I ended up actually solving the problem for my case by getting unique values and then filtering the original dataset based on those. Keep in mind that you would never want to do this if the dataset was very large, but I have the option because I create small-ish dataframes in a loop (selecting from a database) and then work on those chunks.

# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date') \
    .distinct().rdd.map(lambda r: r[0]).collect()

# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
    sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)

    # some custom function to write the data
    writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())

That all said, I'm sure there are many ways in which this is terribly inefficient. One thing I'm considering is repartitioning each RDD by the exact set of values that need to be written into each file, since that write to s3 has to be done atomically. I think that addition to this could potentially help avoid collecting from multiple nodes before writing the data.

Upvotes: 2

Views: 1374

Answers (1)

zero323
zero323

Reputation: 330063

There no bounds to be accessed. DataFrame.repartition uses hash partitioner to shuffle the data so cooccurrence of rows has no wider meaning. All you can assume here is that all records for the particular processed_date are located on the particular partition.

You can improve situation a bit by adding sortWithinPartitions:

(myDataframe
    .repartition("processed_date")
    .sortWithinPartitions("processed_date"))

to be able to access all records for a single date one by one.

Another possible improvement is to use orderBy method:

myDataframe.orderBy("processed_date")

This will result in consecutive dates but still no access to boundaries.

In both cases you'll have to manually detect boundaries when iterating over partition.

Finally if you want real control use RDD and repartitionAndSortWithinPartitions method. This will give you a fine grained control over the data distribution. You can define partitionFunc to distribute data in a specific way and therefore no partition boundaries up front.

DataFrameWriter.partitionBy uses different mechanism which won't be useful for you here.

Upvotes: 1

Related Questions