Reputation: 482
I am currently running into some issues when reading data from a Postgres database using JDBC connections in (Py)Spark. I have a table in Postgres that I would like to read in Spark, process it, and save the results as a .parquet file in an AWS S3 bucket.
I created a sample script which does some basic logic (to not overcomplicate the question):
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
import argparse
import uuid
import datetime
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"--loc",
type=str,
default="./",
help="Output location"
)
args = parser.parse_known_args()[0]
return args
if __name__=="__main__":
args = parse_arguments()
spark = SparkSession.builder. \
appName("test-script"). \
config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1"). \
getOrCreate()
CENTRAL_ID = "id"
PG_HOST = spark.conf.get("spark.yarn.appMasterEnv.PG_HOST")
PG_PORT = spark.conf.get("spark.yarn.appMasterEnv.PG_PORT")
PG_USER = spark.conf.get("spark.yarn.appMasterEnv.PG_USER")
PG_DB = spark.conf.get("spark.yarn.appMasterEnv.PG_DB")
PG_PASS = spark.conf.get("spark.yarn.appMasterEnv.PG_PASS")
PG_MAX_CONCURRENT = spark.conf.get("spark.yarn.appMasterEnv.PG_MAX_CONCURRENT")
table = "test_schema.test_table"
partitions = spark.sparkContext.defaultParallelism
fetch_size = 2000
data = spark.read.format("jdbc"). \
option("url", "jdbc:postgresql://{}:{}/{}".format(PG_HOST, PG_PORT, PG_DB)). \
option("dbtable", "(SELECT *, MOD({}, {}) AS p FROM {}) AS t".format(CENTRAL_ID, partitions, table)). \
option("user", PG_USER). \
option("password", PG_PASS). \
option("driver", "org.postgresql.Driver"). \
option("partitionColumn", "p"). \
option("lowerBound", 0). \
option("upperBound", partitions). \
option("numPartitions", PG_MAX_CONCURRENT). \
option("fetchSize", fetch_size). \
load()
data = data.repartition(partitions)
# Cache data
data.cache()
# Calculate on data
out1 = data.withColumn("abstract_length", length("abstract"))
out2 = data.withColumn("title_length", length("title"))
# Create a timestamp
time_stamp = datetime.datetime.utcnow().isoformat()
save_id = "{}-{}".format(uuid.uuid4(), time_stamp)
out1.select([CENTRAL_ID, "abstract_length"]).write.mode("overwrite").parquet("{}/{}/abstract".format(args.loc, save_id))
out2.select([CENTRAL_ID, "title_length"]).write.mode("overwrite").parquet("{}/{}/title".format(args.loc, save_id))
spark.stop()
print("run successfull!")
The program attempts to limit the the number of concurrent queries to the Potsgres database to 8 (see PG_MAX_CONCURRENT). This is to not overload the database. After loading, I repartition to more (360) partitions, in order to distribute the data over all workers.
The EMR cluster configuration is as follows:
The spark-submit
argument is as follows:
spark-submit \
--master yarn \
--conf 'spark.yarn.appMasterEnv.PG_HOST=<<host>>' \
--conf 'spark.yarn.appMasterEnv.PG_PORT=<<port>>' \
--conf 'spark.yarn.appMasterEnv.PG_DB=<<db>>' \
--conf 'spark.yarn.appMasterEnv.PG_USER=<<user>>' \
--conf 'spark.yarn.appMasterEnv.PG_PASS=<<password>>' \
--conf 'spark.yarn.appMasterEnv.PG_MAX_CONCURRENT=8' \
--conf 'spark.executor.cores=3' \
--conf 'spark.executor.instances=30' \
--conf 'spark.executor.memory=12g' \
--conf 'spark.driver.memory=12g' \
--conf 'spark.default.parallelism=360' \
--conf 'spark.kryoserializer.buffer.max=1000M' \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.dynamicAllocation.enabled=false' \
--packages 'com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1' \
program.py \
--loc s3a://<<bucket>>/
The first type of error I am getting:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.4 in stage 0.0 (TID 26, ip-172-31-35-159.eu-central-1.compute.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 140265 ms
Driver stacktrace:
I am unsure what this means. Could it be that fetching the data from the table takes too long? Or is there another reason?
The second type of error that I am getting is:
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container from a bad node: container_1609406414316_0002_01_000013 on host: ip-172-31-44-127.eu-central-1.compute.internal. Exit status: 137. Diagnostics: [2020-12-31 10:08:38.093]Container killed on request. Exit code is 137
This seems to indicate an OOM problem. But I cannot understand why I am getting an OOM error, because it seems to me that I allocated enough memory to the executors and drivers. Also when I look cluster's stats, I get the idea it has plenty of memory:
Could it be the case that when using 8 concurrent queries to the Postgres server, it sends 1/8th of the data to each of the executors, so the executors should be ready to receive 1/8th of the total size? Or does fetchSize
limit the data size that is send to the executors in order to avoid memory issues? Or maybe is there another reason? The entire table that I try to process is ~110 GB.
Could somebody help? Thanks in advance!
Upvotes: 0
Views: 2211
Reputation: 21
Since the executor container is running out of memory, try adding this to your spark submit configuration:
--conf 'spark.executor.memory=20g' \
If it fails, try upping it more.
Upvotes: 0
Reputation: 21
It seems you are timing out. Have you tried increasing your timeout configuration in your spark submit argument?
--conf 'spark.network.timeout=10000000' \
Spark cluster full of heartbeat timeouts, executors exiting on their own
Upvotes: 2