Reputation: 47
I'm trying to force eager evaluation for PySpark, using the count methodology I read online:
spark_df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
spark_df.cache().count()
However, when I try running the code, the cache count part is taking forever to run. My data size is relatively small (2.7GB, 15 mil rows), but after 28 min of running, I decided to kill the job. For comparison, when I use pandas.read_sql() method to read the data, it took only 6 min 43 seconds.
The machine I'm running the code on is pretty powerful, (20 vCPU, 160 GB RAM, Windows OS). I believe I'm missing a step to speed up the count statement.
Any help or suggestions are appreciated.
Upvotes: 1
Views: 2127
Reputation: 6082
When you used pandas to read, it will use as much memory as possible from the available memory of the machine (assuming all 160Gb as you mentioned, which is by far larger than the data itself ~3Gb).
However, it's not the same with Spark. When you start your Spark session, typically you would have to mention upfront how much memory per executor (and driver, and application manager if applicable) you'd want to use, and if you don't specify it, it's going to be 1Gb according to the latest Spark documentation. So the first thing you want to do is giving more memory to your executors and driver.
Second, reading from JDBC by Spark is tricky, because slowness or not depends on the number of executors (and tasks), and those numbers depend on how many partitions your RDD (that read from JDBC connection) have, and the numbers of partitions depends on your table, your query, columns, conditions, etc. One way to force changing behavior, to have more partitions, more tasks, more executors, ... is via these configurations: numPartitions
, partitionColumn
, lowerBound
, and
upperBound
.
numPartitions
is the number of partitions (hence the number of executors will be used)partitionColumn
is an integer type column that Spark would use to target partitioninglowerBound
is the min value of partitionColumn
that you want to readupperBound
is the max value of partitionColumn
that you want to readYou can read more here https://stackoverflow.com/a/41085557/3441510, but the basic idea is, you want to use a reasonable number of executors (defined by numPartitions
), to process an equally distributed chunk of data for each executor (defined by partitionColumn
, lowerBound
and upperBound
).
Upvotes: 3