Anup
Anup

Reputation: 955

spark dataframe to be written in partitions

I am new to spark and scala. I want to read a directory containing json files. The file has attribute called "EVENT_NAME" which can have 20 different values. I need to separate the events, depending upon the attribute value. i.e. EVENT_NAME=event_A events together. Write these in hive external table structure like: /apps/hive/warehouse/db/event_A/dt=date/hour=hr

Here I have 20 different tables for all the event types and data related to each event should go to respective table. I have managed to write some code but need help to write my data correctly.

{
import org.apache.spark.sql._
import sqlContext._

val path = "/source/data/path"
val trafficRep = sc.textFile(path)

val trafficRepDf = sqlContext.read.json(trafficRep)
trafficRepDf.registerTempTable("trafficRepDf")

trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample")
}

The last line creates a partitioned output but is not how exactly I need it. Please suggest how can I get it correct or any other piece of code to do it.

Upvotes: 0

Views: 5899

Answers (4)

Jack Leow
Jack Leow

Reputation: 22497

I'm assuming you mean you'd like to save the data into separate directories, without using Spark/Hive's {column}={value} format.

You won't be able to use Spark's partitionBy, as Spark partitioning forces you to use that format.

Instead, you have to break your DataFrame into its component partitions, and save them one by one, like so:

{
  import org.apache.spark.sql._
  import sqlContext._

  val path = "/source/data/path"
  val trafficRep = sc.textFile(path)

  val trafficRepDf = sqlContext.read.json(trafficRep)
  val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them.
  for (eventName <- eventNames) {
    val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName)
    trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}")
  }
}

Upvotes: 3

Sanchit Grover
Sanchit Grover

Reputation: 1008

You can add columns with date and hour into your dataframe.

import org.apache.spark.sql._
import sqlContext._

val path = "/source/data/path"
val trafficRep = sc.textFile(path)

val trafficRepDf = sqlContext.read.json(trafficRep)
trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue"))

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")

Upvotes: 1

Raphael Roth
Raphael Roth

Reputation: 27373

I assume you want a table structure like /apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zz, then you need to partition by EVENT_NAME, dt and hour, so try this:

trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")

Upvotes: 0

Related Questions