Adrian Bridgett
Adrian Bridgett

Reputation: 301

What are the best practices to partition Parquet files by timestamp in Spark?

I'm pretty new to Spark (2 days) and I'm pondering the best way to partition parquet files.

My rough plan ATM is:

It's been ludicrously easy (kudos to Spark devs) to get a simple version working - except for partitioning the way I'd like to. This is in python BTW:

input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")

Is the best approach to map the RDD, adding new columns for year, month, day, hour and then use PartitionBy? Then for any queries we have to manually add year/month etc? Given how elegant I've found spark to be so far, this seems a little odd.

Thanks

Upvotes: 7

Views: 11395

Answers (1)

Adrian Bridgett
Adrian Bridgett

Reputation: 301

I've found a few ways to do this now, not yet run performance tests over them, caveat emptor:

First we need to create a derived DataFrame (three ways shown below) and then write it out.

1) sql queries (inline functions)

sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
input.registerTempTable("input")
input_ts = sqlContext.sql(
  "select day(inserted_at) AS inserted_at_day, * from input")

2) sql queries (non-inline) - very similar

def day(ts):
  return f.day
sqlContext.registerFunction("day",day, IntegerType())
... rest as before

3) withColumn

from pyspark.sql.functions import udf
day = udf(lambda f: f.day, IntegerType())
input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))

To write out just:

input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")

Upvotes: 4

Related Questions