MetallicPriest
MetallicPriest

Reputation: 30755

How is a Spark Dataframe partitioned by default?

I know that an RDD is partitioned based on the key values using the HashPartitioner. But how is a Spark Dataframe partitioned by default as it does not have the concept of key/value.

Upvotes: 11

Views: 7101

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

A Dataframe is partitioned dependent on the number of tasks that run to create it.

There is no "default" partitioning logic applied. Here are some examples how partitions are set:

  • A Dataframe created through val df = Seq(1 to 500000: _*).toDF() will have only a single partition.
  • A Dataframe created through val df = spark.range(0,100).toDF() has as many partitions as the number of available cores (e.g. 4 when your master is set to local[4]). Also, see remark below on the "default parallelism" that comes into effect for operations like parallelize with no parent RDD.
  • A Dataframe derived from an RDD (spark.createDataFrame(rdd, schema)) will have the same amount of partitions as the underlying RDD. In my case, as I have locally 6 cores, the RDD got created with 6 partitions.
  • A Dataframe consuming from a Kafka topic will have the amount of partitions matching with the partitions of the topic because it can use as many cores/slots as the topic has partitions to consume the topic.
  • A Dataframe created by reading a file e.g. from HDFS will have the amount of partitions matching them of the file unless individual files have to be splitted into multiple partitions based on spark.sql.files.maxPartitionBytes which defaults to 128MB.
  • A Dataframe derived from a transformation requiring a shuffle will have the configurable amount of partitions set by spark.sql.shuffle.partitions (200 by default).
  • ...

One of the major disctinctions between RDD and Structured API is that you do not have as much control over the partitions as you have with RDDs where you can even define a custom partitioner. This is not possible with Dataframes.

Default Parallelism

The documentation of the Execution Behavior configuration spark.default.parallelism explains:

For operations like parallelize with no parent RDDs, it depends on the cluster manager:

Local mode: number of cores on the local machine

Mesos fine grained mode: 8

Others: total number of cores on all executor nodes or 2, whichever is larger

Upvotes: 19

Related Questions