J.C Guzman
J.C Guzman

Reputation: 1334

Cannot read from PSQL using Pyspark hosted in kubernetes

I have deployed pyspark 3.0.1 in Kubernetes.

I am using koalas in a jupyter notebook in order to perform some transformations and I need to write and read from Azure Database for PostgreSQL.

I can read it from pandas using the following code:

from sqlalchemy import create_engine
import psycopg2
import pandas
uri = 'postgres+psycopg2://<postgreuser>:<postgrepassword>@<server>:5432/<database>'
engine_azure = create_engine(uri, echo=False)

df = pdf.read_sql_query(f"select * from public.<table>", con=engine_azure)

I want to read this table from Pyspark using this code:

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import databricks.koalas as ks
from s3fs import S3FileSystem
import datetime

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.1.1  pyspark-shell  pyspark-shell"
os.environ['PYSPARK_SUBMIT_ARGS2'] = "--packages org.postgresql:postgresql:42.1.1  pyspark-shell"

sparkClassPath = os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "<image>")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "3")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "2000m")
sparkConf.set("spark.executor.memory", "2000m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "29414")
sparkConf.set("spark.driver.host", "<deployment>.svc.cluster.local")
sparkConf.set("spark.driver.extraClassPath", sparkClassPath)
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

df3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://<host>:5432/<database>") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "select * from public.<table>") \
    .option("user", "<user>") \
    .option("password", "<password>") \
    .load()

But I receive this error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-a529178ed9a0> in <module>
      1 url = 'jdbc:postgresql://psql-mcf-prod1.postgres.database.azure.com:5342/cpke-prod'
      2 properties = {'user': '[email protected]', 'password': '4vb44B^V8w2D*q!eQZgl',"driver": "org.postgresql.Driver"}
----> 3 df3 = spark.read.jdbc(url=url, table='select * from public.userinput_write_offs where reversed_date is NULL', properties=properties)

/usr/local/spark/python/pyspark/sql/readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    629             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
    630             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 631         return self._df(self._jreader.jdbc(url, table, jprop))
    632 
    633 

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/usr/local/lib/python3.7/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o89.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:275)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
    at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:194)
    at org.postgresql.Driver.makeConnection(Driver.java:450)
    at org.postgresql.Driver.connect(Driver.java:252)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:64)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:312)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at org.postgresql.core.PGStream.<init>(PGStream.java:68)
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:144)
    ... 27 more

Upvotes: 0

Views: 482

Answers (1)

mck
mck

Reputation: 42392

Your port number is incorrect - it should be 5432, not 5342. Therefore your connection timed out. If you change the line

.option("url", "jdbc:postgresql://<host>:5342/<database>")

to

.option("url", "jdbc:postgresql://<host>:5432/<database>")

maybe it will solve your problem.

Upvotes: 3

Related Questions