scoder
scoder

Reputation: 2611

spark partition data writing by timestamp

I have some data which has timestamp column field which is long and its epoch standard , I need to save that data in split-ted format like yyyy/mm/dd/hh using spark scala

data.write.partitionBy("timestamp").format("orc").save("mypath") 

this is just splitting the data by timestamp like below

timestamp=1458444061098
timestamp=1458444061198

but I want it to be as

└── YYYY
    └── MM
        └── DD
            └── HH

Upvotes: 21

Views: 31608

Answers (2)

Constantine
Constantine

Reputation: 1416

You can leverage various spark sql date/time functions for this. First, you add a new date type column created from the unix timestamp column.

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp"), "YYYYMMddHH"))

After this, you can add year, month, day and hour columns to the DF and then partition by these new columns for the write.

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 

The columns included in the partitionBy clause wont be part of the file schema.

Upvotes: 28

Silvio
Silvio

Reputation: 4207

First, I would caution you with over-partitioning. That is, make sure you have sufficient data to make it worth partitioning by hour otherwise you could end up with lots of partition folders with small files. The second caution I would make is from using a partition hierarchy (year/month/day/hour) since it will require a recursive partition discovery.

Having said that, if you definitely want to partition by hour segments I would suggest truncating your timestamp to the hour into a new column and partitioning by that. Then, Spark will be smart enough to recognize the format as a timestamp when you read it back in and you can actually perform full filtering as needed.

input
  .withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
  .write
  .partitionBy("ts_trunc")
  .save("/mnt/warehouse/part-test")

spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")

The other option would to partition by date and hour of day as so:

input
  .withColumn("date", to_date('timestamp))
  .withColumn("hour", hour('timestamp))
  .write
  .partitionBy("date", "hour")
  .save("/mnt/warehouse/part-test")

Upvotes: 17

Related Questions