Reputation: 3609
HIVE version = 1.1.0
SPARK version = 1.5.1
Distribution =Cloudera CDH 5.5
I am trying to insert a single record into a HIVE table from Spark Code for every run of my spark job
When getupdatedVersion method get invoked then i see that Hive external table gets created but Insertion into that table failed due to below error
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Unsupported language features in query: INSERT INTO Ideal.events_log VALUES ('file_date_2017-04-05','omega_2017-04-05.csv','2017-05-15 17:54:58',1)
TOK_QUERY 0, 0,18, 0
TOK_FROM 0, -1,18, 0
TOK_VIRTUAL_TABLE 0, -1,18, 0
TOK_VIRTUAL_TABREF 0, -1,-1, 0
TOK_ANONYMOUS 0, -1,-1, 0
TOK_VALUES_TABLE 1, 8,18, 42
TOK_VALUE_ROW 1, 10,18, 42
'file_date_2017-04-05' 1, 11,11, 42
'omega_2017-04-05.csv' 1, 13,13, 65
'2017-05-15 17:54:58' 1, 15,15, 98
1 1, 17,17, 120
TOK_INSERT 1, 0,-1, 12
TOK_INSERT_INTO 1, 0,6, 12
TOK_TAB 1, 4,6, 12
TOK_TABNAME 1, 4,6, 12
Ideal 1, 4,4, 12
events_log 1, 6,6, 23
TOK_SELECT 0, -1,-1, 0
TOK_SELEXPR 0, -1,-1, 0
TOK_ALLCOLREF 0, -1,-1, 0
scala.NotImplementedError: No parse rules for:
TOK_VIRTUAL_TABLE 0, -1,18, 0
TOK_VIRTUAL_TABREF 0, -1,-1, 0
TOK_ANONYMOUS 0, -1,-1, 0
TOK_VALUES_TABLE 1, 8,18, 42
TOK_VALUE_ROW 1, 10,18, 42
'file_date_2017-04-05' 1, 11,11, 42
'omega_2017-04-05.csv' 1, 13,13, 65
'2017-05-15 17:54:58' 1, 15,15, 98
1 1, 17,17, 120
Here is the structure of HIVE table
CREATE EXTERNAL TABLE `events_log`(
`file_date` string,
`file_name` string,
`audit_timestamp` string,
`version` int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://nameservice1//data/staged/Ideal/omega/events_log'
My Spark Code :
def getupdatedVersion(sc:SparkContext, sqlContext: HiveContext,omega_file_date: String, omega_source_file_name: String,config:Config):Int = {
sqlContext.sql(s""" CREATE EXTERNAL TABLE IF NOT EXISTS Ideal.events_log(file_date String, file_name String, audit_timestamp String, version int) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," LOCATION "/data/staged/Ideal/omega/events_log" """.stripMargin)
val versionDF = sqlContext.sql(s""" SELECT MAX(version) AS version FROM Ideal.events_log WHERE file_date ='$omega_file_date' """.stripMargin )
var version = versionDF.map(row => row.getInt(0)).first()
if (version >= 0 )
{
version = version+1
sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
}
else
{
version =0
sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
}
version
}
Could someone help me to fix this issue?
Upvotes: 0
Views: 784
Reputation: 3609
Updated Answer : I tried the below and it worked, But for every insert it is creating a new file ,hence I will go ahead as suggested by Samson
def getupdatedVersion(sc:SparkContext, sqlContext: HiveContext,omega_file_date: String, omega_source_file_name: String,config:Config):Int = {
sqlContext.sql(s""" CREATE EXTERNAL TABLE IF NOT EXISTS Ideal.events_log(file_date String, file_name String, audit_timestamp String, version int) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," LOCATION "/data/staged/Ideal/omega/events_log" """.stripMargin)
val versionDF = sqlContext.sql(s""" SELECT MAX(version) AS version FROM Ideal.events_log WHERE file_date ='$omega_file_date' """.stripMargin )
var version = versionDF.map(row => row.getInt(0)).first()
if (version >= 0 )
{
version = version+1
//sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
val insertDF = sqlContext.sql(s"""SELECT "$omega_file_date" AS omega_file_date, "$omega_source_file_name" AS omega_source_file_name , "$now" AS audit_timestamp , "$version" AS version """.stripMargin)
insertDF.write.mode("append").insertInto("<dbname>.events_log")
}
else
{
version =0
//sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
val insertDF = sqlContext.sql(s"""SELECT "$omega_file_date" AS omega_file_date, "$omega_source_file_name" AS omega_source_file_name , "$now" AS audit_timestamp , "$version" AS version """.stripMargin)
insertDF.write.mode("append").insertInto("<dbname>.events_log")
}
version
}
Upvotes: 0