Reputation: 955
I am new to spark and scala. I want to read a directory containing json files. The file has attribute called "EVENT_NAME" which can have 20 different values. I need to separate the events, depending upon the attribute value. i.e. EVENT_NAME=event_A events together. Write these in hive external table structure like: /apps/hive/warehouse/db/event_A/dt=date/hour=hr
Here I have 20 different tables for all the event types and data related to each event should go to respective table. I have managed to write some code but need help to write my data correctly.
{
import org.apache.spark.sql._
import sqlContext._
val path = "/source/data/path"
val trafficRep = sc.textFile(path)
val trafficRepDf = sqlContext.read.json(trafficRep)
trafficRepDf.registerTempTable("trafficRepDf")
trafficRepDf.write.partitionBy("EVENT_NAME").save("/apps/hive/warehouse/db/sample")
}
The last line creates a partitioned output but is not how exactly I need it. Please suggest how can I get it correct or any other piece of code to do it.
Upvotes: 0
Views: 5899
Reputation: 22497
I'm assuming you mean you'd like to save the data into separate directories, without using Spark/Hive's {column}={value}
format.
You won't be able to use Spark's partitionBy
, as Spark partitioning forces you to use that format.
Instead, you have to break your DataFrame
into its component partitions, and save them one by one, like so:
{
import org.apache.spark.sql._
import sqlContext._
val path = "/source/data/path"
val trafficRep = sc.textFile(path)
val trafficRepDf = sqlContext.read.json(trafficRep)
val eventNames = trafficRepDf.select($"EVENT_NAME").distinct().collect() // Or if you already know what all 20 values are, just hardcode them.
for (eventName <- eventNames) {
val trafficRepByEventDf = trafficRepDef.where($"EVENT_NAME" === eventName)
trafficRepByEventDf.write.save(s"/apps/hive/warehouse/db/sample/${eventName}")
}
}
Upvotes: 3
Reputation: 11
https://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-16-to-20
Dataset and DataFrame API
registerTempTable
has been deprecated and replaced bycreateOrReplaceTempView
Upvotes: 0
Reputation: 1008
You can add columns with date and hour into your dataframe.
import org.apache.spark.sql._
import sqlContext._
val path = "/source/data/path"
val trafficRep = sc.textFile(path)
val trafficRepDf = sqlContext.read.json(trafficRep)
trafficRepDf.withColumn("dt", lit("dtValue")).withColumn("hour", lit("hourValue"))
trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")
Upvotes: 1
Reputation: 27373
I assume you want a table structure like /apps/hive/warehouse/db/EVENT_NAME=xx/dt=yy/hour=zz
, then you need to partition by EVENT_NAME
, dt
and hour
, so try this:
trafficRepDf.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")
Upvotes: 0