Moises B.
Moises B.

Reputation: 647

SparkSQL PostgresQL Dataframe partitions

I have a very simple setup of SparkSQL connecting to a Postgres DB and I'm trying to get a DataFrame from a table, the Dataframe with a number X of partitions (lets say 2). The code would be the following:

Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load();

For some reason, one partition of the DataFrame contains almost all rows.

For what I can understand lowerBound/upperBound are the parameters used to finetune this. In SparkSQL's documentation (Spark 1.4.0 - spark-sql_2.11) it says they are used to define the stride, not to filter/range the partition column. But that raises several questions:

  1. The stride is the frequency (number of elements returned each query) with which Spark will query the DB for each executor (partition)?
  2. If not, what is the purpose of this parameters, what do they depend on and how can I balance my DataFrame partitions in a stable way (not asking all partitions contain the same number of elements, just that there is an equilibrium - for example 2 partitions 100 elements 55/45 , 60/40 or even 65/35 would do)

Can't seem to find a clear answer to these questions around and was wondering if maybe some of you could clear this points for me, because right now is affecting my cluster performance when processing X million rows and all the heavy lifting goes to one single executor.

Cheers and thanks for your time.

Upvotes: 5

Views: 4594

Answers (3)

gneets
gneets

Reputation: 19

lowerbound and upperbound have been currently identified to do what they do in the previous answers. A followup to this would be how to balance the data across partitions without looking at the min max values or if your data is heavily skewed.

If your database supports "hash" function, it could do the trick.

partitionColumn = "hash(column_name)%num_partitions"

numPartitions = 10 // whatever you want

lowerBound = 0

upperBound = numPartitions

This will work as long as the modulus operation returns a uniform distribution over [0,numPartitions)

Upvotes: 0

frengel
frengel

Reputation: 181

Essentially the lower and upper bound and the number of partitions are used to calculate the increment or split for each parallel task.

Let's say the table has partition column "year", and has data from 2006 to 2016.

If you define the number of partitions as 10, with lower bound 2006 and higher bound 2016, you will have each task fetching data for its own year - the ideal case.

Even if you incorrectly specify the lower and / or upper bound, e.g. set lower = 0 and upper = 2016, there will be a skew in data transfer, but, you will not "lose" or fail to retrieve any data, because:

The first task will fetch data for year < 0.

The second task will fetch data for year between 0 and 2016/10.

The third task will fetch data for year between 2016/10 and 2*2016/10.

...

And the last task will have a where condition with year->2016.

T.

Upvotes: 7

Moises B.
Moises B.

Reputation: 647

Lower bound are indeed used against the partitioning column; refer to this code (current version at the moment of writing this):

https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Function columnPartition contains the code for the partitioning logic and the use of lower / upper bound.

Upvotes: 2

Related Questions