Reputation: 187
I am new to Spark. Here is something I wanna do.
I have created two data streams; first one reads data from text file and register it as a temptable using hivecontext. The other one continuously gets RDDs from Kafka and for each RDD, it it creates data streams and register the contents as temptable. Finally I join these two temp tables on a key to get final result set. I want to insert that result set in a hive table. But I am out of ideas. Tried to follow some exmples but that only create a table with one column in hive and that too not readable. Could you please show me how to insert results in a particular database and table of hive. Please note that I can see the results of join using show function so the real challenge lies with insertion in hive table.
Below is the code I am using.
imports.....
object MSCCDRFilter {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF()
cgiDF.registerTempTable("my_cgi_list")
val CGITable=sqlContext.sql("select *"+
" from my_cgi_list")
CGITable.show() // this CGITable is a structure I defined in the project
val streamingContext = new StreamingContext(sc, Seconds(10)
val zkQuorum="hadoopserver:2181"
val topics=Map[String, Int]("FlumeToKafka"->1)
val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics)
val logLinesDStream = messages.map(_._2) //获取数据
logLinesDStream.print()
val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class
// MSCCDR_GO and MSC_KPI are structures defined in the project
MSCCDRDStream.foreachRDD(MSCCDR => {
println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count())
if (MSCCDR.count() == 0) {
println("==================No logs received in this time interval=================")
} else {
val dataf=sqlContext.createDataFrame(MSCCDR)
dataf.registerTempTable("hive_msc")
cgiDF.registerTempTable("my_cgi_list")
val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b"
+" on a.CGI=b.CGI")
sqlquery.show()
sqlContext.sql("SET hive.exec.dynamic.partition = true;")
sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")
val FilteredCDR = sqlContext.sql("select p.*, q.* " +
" from MSCCDRFiltered p left join my_cgi_list q " +
"on p.CGI=q.CGI ")
println("======================print result =================")
FilteredCDR.show()
streamingContext.start()
streamingContext.awaitTermination()
}
}
Upvotes: 0
Views: 790
Reputation: 1518
I have had some success writing to Hive, using the following:
dataFrame
.coalesce(n)
.write
.format("orc")
.options(Map("path" -> savePath))
.mode(SaveMode.Append)
.saveAsTable(fullTableName)
Our attempts to use partitions weren't followed through with, because I think there was some issue with our desired partitioning column.
The only limitation is with concurrent writes, where the table does not exist yet, then any task tries to create the table (because it didn't exist when it first attempted to write to the table) will Exception out.
Be aware, that writing to Hive in streaming applications is usually bad design, as you will often write many small files, which is very inefficient to read and store. So if you write more often than every hour or so to Hive, you should make sure you include logic for compaction, or add an intermediate storage layer more suited to transactional data.
Upvotes: 1