figs_and_nuts
figs_and_nuts

Reputation: 5771

pyspark dataframe not maintaining order after dropping a column

I create a dataframe:

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

its contents are :

df.show()
+---+---+
|  a|  c|
+---+---+
|  0|  0|
|  1|  1|
|  3|  3|
|  5|  5|
|  6|  6|
|  8|  8|
|  9|  9|
| 10| 10|
|  2|  2|
|  4|  4|
|  7|  7|
| 11| 11|
+---+---+

But, If I drop a column, the remaining column gets permuted

df.drop('c').show()
+---+
|  a|
+---+
|  0|
|  2|
|  3|
|  5|
|  6|
|  7|
|  9|
| 11|
|  1|
|  4|
|  8|
| 10|
+---+

Please help me understand what is happening here?

Upvotes: 0

Views: 790

Answers (1)

m_vemuri
m_vemuri

Reputation: 792

I wanted to add my answer since I felt I could explain this slightly differently.

The repartition results in a RoundRobinPartition. It basically redistributes the data in round-robin fashion.

Since you are evaluating the dataframe again, it recomputes the partition after the drop.

You can see this by running a few commands in addition to what you have shown.

df = spark.createDataFrame(pd.DataFrame({'a':range(12),'c':range(12)})).repartition(8)

df.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- Scan ExistingRDD[a#14L,c#15L]

print("Partitions structure: {}".format(df.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0, c=0), Row(a=1, c=1), Row(a=3, c=3), Row(a=5, c=5), Row(a=6, c=6), Row(a=8, c=8), Row(a=9, c=9), Row(a=10, c=10)], [Row(a=2, c=2), Row(a=4, c=4), Row(a=7, c=7), Row(a=11, c=11)]]

temp = df.drop("c")

temp.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- *(1) Project [a#14L]
#   +- Scan ExistingRDD[a#14L,c#15L]


print("Partitions structure: {}".format(temp.rdd.glom().collect()))
# Partitions structure: [[], [], [], [], [], [], [Row(a=0), Row(a=2), Row(a=3), Row(a=5), Row(a=6), Row(a=7), Row(a=9), Row(a=11)], [Row(a=1), Row(a=4), Row(a=8), Row(a=10)]]

In the above code, the explain() shows the RoundRobinPartitioning taking place. The use of glom shows the redistribution of data across partitions.

In the original dataframe, the partitions are in the order that you see the results of show().

In the second dataframe above, you can see that the data has shuffled across the last two partitions, resulting in it not being in the same order. This is because when re-evaluating the dataframe the repartition runs again.

Edits as per discussion in the comments

If you run a df.drop('b'), we are trying to drop a column that doesn't exist. So it's really what is called a noop or a no operation. So the partitioning doesn't change.

df.drop('b').explain()

# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- Scan ExistingRDD[a#70L,c#71L]

Similarly if you're adding a column and run it, the round partition runs before the column is added. This again results in the same partitioning and hence the order is consistent with the original dataframe.

import pyspark.sql.functions as f

df.withColumn('tt', f.rand()).explain()

# == Physical Plan ==
# *(1) Project [a#70L, c#71L, rand(-3030352041536166328) AS tt#76]
# +- Exchange RoundRobinPartitioning(8)
#    +- Scan ExistingRDD[a#70L,c#71L]

In the case of df.drop('c'), the column is first dropped and then the partitioner is applied. This results in a different partitioning since the resulting dataframe in the stage before the partitioning is run is different.

df.drop('c').explain()

# == Physical Plan ==
# Exchange RoundRobinPartitioning(8)
# +- *(1) Project [a#70L]
#    +- Scan ExistingRDD[a#70L,c#71L]

As mentioned in another answer to this question, the round-robin partitioner is random for different data, but consistent with the same data on which the partition is run. So if the underlying data changes from the operation, the resulting partition will be different.

Upvotes: 1

Related Questions