Reputation: 549
I'm trying to make a temporary table a create on pyspark available via Thrift. My final goal is to be able to access that from a database client like DBeaver using JDBC.
I'm testing first using beeline.
This is what i'm doing.
spark.sql.hive.thriftServer.singleSession true
on spark-defaults.conf
Started Pyspark shell (for testing sake) and ran the following code:
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')
result = sqlContext.sql('select * from peebs')
So far so good, everything works fine.
On a different terminal I initialize spark thrift server:
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077
The server appears to start normally and I'm able to see both pyspark and thrift server jobs running on my spark cluster master UI.
I then connect to the cluster using beeline
./bin/beeline
beeline> !connect jdbc:hive2://172.18.0.2:10001
This is what I got
Connecting to jdbc:hive2://172.18.0.2:10001
Enter username for jdbc:hive2://172.18.0.2:10001:
Enter password for jdbc:hive2://172.18.0.2:10001:
2019-06-29 20:14:25 INFO Utils:310 - Supplied authorities: 172.18.0.2:10001
2019-06-29 20:14:25 INFO Utils:397 - Resolved authority: 172.18.0.2:10001
2019-06-29 20:14:25 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://172.18.0.2:10001
Connected to: Spark SQL (version 2.3.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Seems to be ok.
When I list show tables;
I can't see anything.
Two interesting things I'd like to highlight is:
When I start pyspark I get these warnings
WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
When I start the thrift server I get these:
rsync from spark://172.18.0.2:7077
ssh: Could not resolve hostname spark: Name or service not known
rsync: connection unexpectedly closed (0 bytes received so far) [Receiver]
rsync error: unexplained error (code 255) at io.c(235) [Receiver=3.1.2]
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to ...
I've been through several posts and discussions. I see people saying we can't have temporary tables exposed via thrift unless you start the server from within the same code. If that's true how can I do that in python (pyspark)?
Thanks
Upvotes: 1
Views: 3049
Reputation: 848
In case someone needs to do this in Spark Streaming I got it to work like this.
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
spark = SparkSession \
.builder \
.appName('restlogs_qlik') \
.enableHiveSupport()\
.config('spark.sql.hive.thriftServer.singleSession', True)\
.config('hive.server2.thrift.port', '10001') \
.getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')
#Order matters!
java_import(sc._gateway.jvm, "")
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
#Define schema of json
schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")
#load data into spark-structured streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "rest_logs") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
#Print output
query = df.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("view_figures") \
.start()
query.awaitTermination();
After you start it up you can JDBC with beehive to test. What I can't comprehend is that I have to start the Thrift server in the same script. This is how to start the script.
spark-submit --master local[2] \
--conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
"C:\\Temp\\spark_kafka.py"
Hope this helps someone. By the way, I am in preliminary research stage so don't judge me.
Upvotes: 1
Reputation: 549
After doing several tests I was able to come up with a simple (no authentication) code that's working for me.
It's important noticing that if you want to make temporary tables available via JDBC you need to start thrift server in the same JVM (same spark job) and ensure the code hangs so the application is kept running in the cluster.
Following a working sample code I created for reference:
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
spark = SparkSession \
.builder \
.appName('the_test') \
.enableHiveSupport()\
.config('spark.sql.hive.thriftServer.singleSession', True)\
.config('hive.server2.thrift.port', '10001') \
.getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')
java_import(sc._gateway.jvm, "")
from pyspark.sql import Row
l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
while True:
time.sleep(10)
I simply used the .py above in my spark-submit and the I was able to connect via JDBC through beeline and using DBeaver using the Hive JDBC Driver.
Upvotes: 0
Reputation: 1054
createOrReplaceTempView
creates an in-memory table. The Spark thrift server needs to be started on the same driver JVM where we created the in-memory table.
In the above example, the driver on which the table is created and the driver running STS(Spark Thrift server) are different.
Two options
1. Create the table using createOrReplaceTempView
in the same JVM where the STS is started.
2. Use a backing metastore, and create tables using org.apache.spark.sql.DataFrameWriter#saveAsTable
so that tables are accessible independent of the JVM(in fact without any Spark driver.
Regarding the errors:
1. Relates to client and server metastore version.
2. Seems like some rsync script trying to decode spark:\\
url
Both doesnt seems to be related to the issue.
Upvotes: 0