Liu Yu
Liu Yu

Reputation: 391

How to insert a table into Hive with PySpark API In Spark 2.4.0

I need to insert into table to Hive. FYI, this table is available in Hive. Here is my code,

from pyspark.sql import SparkSession as sc, HiveContext as HC
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
sqlContext = HC(spark)
sqlContext.sql("INSERT INTO mydb.my_job_status_table 
SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime 
FROM batches b 
inner join sourcetables st on b.rungroup = st.rungroup 
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid 
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
WHERE b.rungroup like 'sgb_%'")

After I run the code in Spark, I received an error,

raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input '01' expecting <EOF>(line 1, pos 195)\n\n== SQL ==\nINSERT INTO mydb.my_job_status_table...

What did I do wrong? What's the difference betwee SqlContext and Spark.sql?

Upvotes: 0

Views: 2069

Answers (2)

diogoramos
diogoramos

Reputation: 86

Try this

spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()

spark.sql("INSERT INTO mydb.my_job_status_table " + 
"SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime " + 
"FROM batches b " + 
"inner join sourcetables st on b.rungroup = st.rungroup " +
"inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid " + 
"inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid " + 
"WHERE b.rungroup like 'sgb_%'")

Upvotes: 0

maxime G
maxime G

Reputation: 1771

your problem is not pyspark specific.

dont use insert into into spark sql.

firstly, use SELECT to make your dataset :

  dataset = sqlContext.sql(" SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime 
    FROM batches b 
    inner join sourcetables st on b.rungroup = st.rungroup 
    inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid 
    inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
    WHERE b.rungroup like 'sgb_%'")

then use insertInto

dataset.insertInto("mydb.my_job_status_table")

pyspark documentation : https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrameWriter.insertInto

Javadoc : https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameWriter.html#insertInto-java.lang.String-

Upvotes: 1

Related Questions