Reputation: 13
I want to try to load data into hive external table using spark. please help me on this, how to load data into hive using scala code or java
Thanks in advance
Upvotes: 1
Views: 9969
Reputation: 895
**I have tried similar scenario and had satisfactory results.I have worked with avro data with schema in json.I streamed kafka topic with spark streaming and persisted the data in to hdfs which is the location of an external table.So every 2 seconds(the streaming duration the data will be stored in to hdfs in a seperate file and the hive external table will be appended as well).
Here is the simple code snippet
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd =>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val dataframe = sqlContext.read.json(rdd.map(_._2))
val myEvent = dataframe.toDF()
import org.apache.spark.sql.SaveMode
myEvent.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save("maprfs:///location/of/hive/external/table")
})
Don't forget to stop the 'SSC' at the end of the application.Doing it gracefully is more preferable.
P.S: Note that while creating an external table make sure you are creating the table with schema identical to the dataframe schema. Because when getting converted in to a dataframe which is nothing but a table, the columns will be arranged in an alphabetic order.
Upvotes: 1
Reputation: 2226
Assuming that hive external table is already created using something like,
CREATE EXTERNAL TABLE external_parquet(c1 INT, c2 STRING, c3 TIMESTAMP)
STORED AS PARQUET LOCATION '/user/etl/destination'; -- location is some directory on HDFS
And you have an existing dataFrame / RDD in Spark, that you want to write.
import sqlContext.implicits._
val rdd = sc.parallelize(List((1, "a", new Date), (2, "b", new Date), (3, "c", new Date)))
val df = rdd.toDF("c1", "c2", "c3") //column names for your data frame
df.write.mode(SaveMode.Overwrite).parquet("/user/etl/destination") // If you want to overwrite existing dataset (full reimport from some source)
If you don't want to overwrite existing data from your dataset...
df.write.mode(SaveMode.Append).parquet("/user/etl/destination") // If you want to append to existing dataset (incremental imports)
Upvotes: 3