Reputation: 1334
I have deployed pyspark 3.0.1 in Kubernetes.
I am using koalas in a jupyter notebook in order to perform some transformations and I need to write and read from Azure Database for PostgreSQL.
I can read it from pandas using the following code:
from sqlalchemy import create_engine
import psycopg2
import pandas
uri = 'postgres+psycopg2://<postgreuser>:<postgrepassword>@<server>:5432/<database>'
engine_azure = create_engine(uri, echo=False)
df = pdf.read_sql_query(f"select * from public.<table>", con=engine_azure)
I want to read this table from Pyspark using this code:
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import databricks.koalas as ks
from s3fs import S3FileSystem
import datetime
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.1.1 pyspark-shell pyspark-shell"
os.environ['PYSPARK_SUBMIT_ARGS2'] = "--packages org.postgresql:postgresql:42.1.1 pyspark-shell"
sparkClassPath = os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<image>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "2000m")
sparkConf.set("spark.executor.memory", "2000m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29414")
sparkConf.set("spark.driver.host", "<deployment>.svc.cluster.local")
sparkConf.set("spark.driver.extraClassPath", sparkClassPath)
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
df3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://<host>:5432/<database>") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "select * from public.<table>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
But I receive this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-a529178ed9a0> in <module>
1 url = 'jdbc:postgresql://psql-mcf-prod1.postgres.database.azure.com:5342/cpke-prod'
2 properties = {'user': '[email protected]', 'password': '4vb44B^V8w2D*q!eQZgl',"driver": "org.postgresql.Driver"}
----> 3 df3 = spark.read.jdbc(url=url, table='select * from public.userinput_write_offs where reversed_date is NULL', properties=properties)
/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
629 jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
630 return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 631 return self._df(self._jreader.jdbc(url, table, jprop))
632
633
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o89.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
at org.postgresql.Driver.makeConnection(Driver.java:450)
at org.postgresql.Driver.connect(Driver.java:252)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:312)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.postgresql.core.PGStream.<init>(PGStream.java:68)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
... 27 more
Upvotes: 0
Views: 482
Reputation: 42392
Your port number is incorrect - it should be 5432, not 5342. Therefore your connection timed out. If you change the line
.option("url", "jdbc:postgresql://<host>:5342/<database>")
to
.option("url", "jdbc:postgresql://<host>:5432/<database>")
maybe it will solve your problem.
Upvotes: 3