Reputation: 55
I'm writing a dataframe to an external hive table from pyspark running on EMR. The work involves dropping/truncating data from an external hive table, writing the contents of a dataframe into aforementioned table, then writing the data from hive to DynamoDB. I am looking to write to an internal table on the EMR cluster but for now I would like the hive data to be available to subsequent clusters. I could write to the Glue catalog directly and force it to registered but that is a step further than I need to go.
All components work fine individually on a given EMR cluster: I can create an external hive table on EMR, either using a script or ssh and hive shell. This table can be queried by Athena and can be read from by pyspark. I can create a dataframe and INSERT OVERWRITE the data into the aforementioned table in pyspark. I can then use hive shell to copy the data from the hive table into a DynamoDB table.
I'd like to wrap all of the work into the one pyspark script instead of having to submit multiple distinct steps.
I am able to drop tables using
sqlContext.sql("drop table if exists default.my_table")
When I try to create a table using sqlContext.sql("create table default.mytable(id string,val string) STORED AS ORC")
I get the following error:
org.apache.hadoop.net.ConnectTimeoutException: Call From ip-xx-xxx-xx-xxx/xx.xxx.xx.xx to ip-xxx-xx-xx-xx:8020 failed on socket timeout exception: org.apache.hadoop.net.ConnectTimeoutException: 20000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=ip-xxx-xx-xx-xx:8020]; For more details see: http://wiki.apache.org/hadoop/SocketTimeout
I can't figure out why I can create an external hive table in Glue using hive shell on the cluster, drop the table using hive shell or pyspark sqlcontext, but I can't create a table using sqlcontext. I have checked around and the solutions offered don't make sense in this context (copying hive-site.xml) as I can clearly write to the required addresses with no hassle, just not in pyspark. And it is doubly strange that I can drop the tables with them being definitely dropped when I check in Athena.
Running on: emr-5.28.0, Hadoop distribution Amazon 2.8.5 Spark 2.4.4 Hive 2.3.6 Livy 0.6.0 (for notebooks but my experimentation is via ssh and pyspark shell)
Upvotes: 0
Views: 1723
Reputation: 673
Complementing @Zeathor's answer. After configuring the EMR and Glue connection and permission (you can check more in here: https://www.youtube.com/watch?v=w20tapeW1ME), you will just need to write sparkSQL commands:
spark = SparkSession.builder.appName('TestSession').getOrCreate()
spark.sql("create database if not exists test")
You can then create your tables from dataframes:
df.createOrReplaceTempView("first_table");
spark.sql("create table test.table_name as select * from first_table");
All the databases and tables metadata will then be stored in AWS Glue Catalogue.
Upvotes: 0
Reputation: 55
Turns out I could create tables via a spark.sql() call as long as I provided a location for the tables. Seems like Hive shell doesn't require it, yet spark.sql() does. Not expected but not entirely unsurprising.
Upvotes: 0