Reputation: 768
I have the following task:
I am using the following code (more or less):
def connect_to_oracle_db(spark_session, db_query):
return spark_session.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:@//<host>:<port>/<srvice_name") \
.option("user", "<user>") \
.option("password", "<pass>") \
.option("dbtable", db_query) \
.option("driver", "oracle.jdbc.driver.OracleDriver")
def run(self):
all_schemes = <list of all available schemes>
for str_schema in all_schemes:
db_query = "(Select * from " + str_schema + ".TABLE1) TABLE1_DATA"
df_table1 = slef.connect_to_oracle_db(db_query).load()
# process df_table1
There are around 300 schemes and and it is quite slow because each for iteration the new connection is created and close. I want to find a way how to reuse the existing connection or somehow create the connection pool. It looks quite ineffective for me.
Do you have any idea how to reuse the connection or create connection pool for PySpark?
Upvotes: 1
Views: 2629
Reputation: 4151
There is no place for a connection pool in a classical sense in a distributed like Spark. You have to remember that each partition can be processed by different physical node, different logical container (if applicable on a given cluster manager) and finally different JVMs.
Not that connection pool could really help in such case. Since Spark is intended for massive imports, individual connection utilization is already pretty high.
There are however different visible problems here (and possibly other problems, not obvious from the snippet, as the code you've shown doesn't actually fetch the data):
You didn't configure fetchsize
, so the default for the particular driver will be used. For Oracle it is 10, which is completely unfit for large scale processing
return spark_session.read
.format("jdbc")
.option("fetchsize", some_reasonable_value)
...
You didn't configure partitioning, so Spark will process all data using only a single partition. You can read about possible solutions in How to optimize partitioning when migrating data from JDBC source?
You've modeled this a sequential process. Unless dataset are somehow combined downstream it would be best, to submit a separate job for each table, and let scheduler optimize things, according to available resources.
You can also consider processing tables in parallel in a single application
And just to reiterate - Spark is lazy, so the core problem can be in some other place, and the issues listed above, can be secondary.
Upvotes: 3