unknown
unknown

Reputation: 251

Pyspark Window orderBy

I have a dataframe which looks like

+--------+---+------+----+
|group_id| id|  text|type|
+--------+---+------+----+
|       1|  1|   one|   a|
|       1|  1|   two|   t|
|       1|  2| three|   a|
|       1|  2|  four|   t|
|       1|  5|  five|   a|
|       1|  6|   six|   t|
|       1|  7| seven|   a|
|       1|  9| eight|   t|
|       1|  9|  nine|   a|
|       1| 10|   ten|   t|
|       1| 11|eleven|   a|
+--------+---+------+----+

If I do Window operation by partitioning it on group_id and ordering it by id then will orderby make sure that already ordered(sorted) rows retain the same order?

e.g.

window_spec = Window.partitionBy(df.group_id).orderBy(df.id)
df = df.withColumn("row_number", row_number().over(window_spec))

Will always be

+--------+---+------+----+------+                                               
|group_id| id|  text|type|row_number|
+--------+---+------+----+------+
|       1|  1|   one|   a|     1|
|       1|  1|   two|   t|     2|
|       1|  2| three|   a|     3|
|       1|  2|  four|   t|     4|
|       1|  5|  five|   a|     5|
|       1|  6|   six|   t|     6|
|       1|  7| seven|   a|     7|
|       1|  9| eight|   t|     8|
|       1|  9|  nine|   a|     9|
|       1| 10|   ten|   t|    10|
|       1| 11|eleven|   a|    11|
+--------+---+------+----+------+

In the nutshell my question is, how spark Window's orderBy handles already ordered(sorted) rows? My assumption is it is stable i.e. it doesn't change the order of already ordered rows but I couldn't find anything related to this in the documentation. How can I make sure that my assumption is correct?

Thanks.

Upvotes: 2

Views: 10743

Answers (1)

Wade Pimenta
Wade Pimenta

Reputation: 181

First, to set up context for those reading that may not know the definition of a stable sort, I'll quote from this StackOverflow answer by Joey Adams

"A sorting algorithm is said to be stable if two objects with equal keys appear in the same order in sorted output as they appear in the input array to be sorted" - Joey Adams

Now, a window function in spark can be thought of as Spark processing mini-DataFrames of your entire set, where each mini-DataFrame is created on a specified key - "group_id" in this case.

That is, if the supplied dataframe had "group_id"=2, we would end up with two Windows, where the first only contains data with "group_id"=1 and another the "group_id"=2.

This is important to note, because we could test the effects of the .orderBy() call on a sample dataframe without having to actually worry about what is happening to a Window. To emphasize what is happening:

  1. Data is partitioned by a specified key
  2. Transformations are then applied to the 'mini-DataFrames' created in each window

Hence, for a pre-sorted input such as :

df = spark.createDataFrame(
    [
        {'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
        {'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
        {'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
        {'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
        {'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
        {'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
        {'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
        {'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
        {'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
        {'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
        {'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
    ]
)

+--------+---+------+----+
|group_id| id|  text|type|
+--------+---+------+----+
|       1|  1|   one|   a|
|       1|  1|   two|   t|
|       1|  2| three|   a|
|       1|  2|  four|   t|
|       1|  5|  five|   a|
|       1|  6|   six|   t|
|       1|  7| seven|   a|
|       1|  9| eight|   t|
|       1|  9|  nine|   a|
|       1| 10|   ten|   t|
|       1| 11|eleven|   a|
+--------+---+------+----+

We apply:

df.orderBy('id').show()

Resulting in:

+--------+---+------+----+
|group_id| id|  text|type|
+--------+---+------+----+
|       1|  1|   one|   a|
|       1|  1|   two|   t|
|       1|  2| three|   a|
|       1|  2|  four|   t|
|       1|  5|  five|   a|
|       1|  6|   six|   t|
|       1|  7| seven|   a|
|       1|  9|  nine|   a|
|       1|  9| eight|   t|
|       1| 10|   ten|   t|
|       1| 11|eleven|   a|
+--------+---+------+----+

At first, this seems stable, but let's apply this to a DataFrame with the row with text="two" swapped with the row with text="three":

df = spark.createDataFrame(
    [
        {'group_id': 1, 'id': 1, 'text': 'one', 'type': 'a'},
        {'group_id': 1, 'id': 2, 'text': 'three', 'type': 'a'},
        {'group_id': 1, 'id': 1, 'text': 'two', 'type': 't'},
        {'group_id': 1, 'id': 2, 'text': 'four', 'type': 't'},
        {'group_id': 1, 'id': 5, 'text': 'five', 'type': 'a'},
        {'group_id': 1, 'id': 6, 'text': 'six', 'type': 't'},
        {'group_id': 1, 'id': 7, 'text': 'seven', 'type': 'a'},
        {'group_id': 1, 'id': 9, 'text': 'eight', 'type': 't'},
        {'group_id': 1, 'id': 9, 'text': 'nine', 'type': 'a'},
        {'group_id': 1, 'id': 10, 'text': 'ten', 'type': 't'},
        {'group_id': 1, 'id': 11, 'text': 'eleven', 'type': 'a'}
   ]
)

+--------+---+------+----+
|group_id| id|  text|type|
+--------+---+------+----+
|       1|  1|   one|   a|
|       1|  2| three|   a|
|       1|  1|   two|   t|
|       1|  2|  four|   t|
|       1|  5|  five|   a|
|       1|  6|   six|   t|
|       1|  7| seven|   a|
|       1|  9| eight|   t|
|       1|  9|  nine|   a|
|       1| 10|   ten|   t|
|       1| 11|eleven|   a|
+--------+---+------+----+

Then apply:

df.orderBy(df.id).show()

Which results in:

+--------+---+------+----+
|group_id| id|  text|type|
+--------+---+------+----+
|       1|  1|   two|   t|
|       1|  1|   one|   a|
|       1|  2|  four|   t|
|       1|  2| three|   a|
|       1|  5|  five|   a|
|       1|  6|   six|   t|
|       1|  7| seven|   a|
|       1|  9|  nine|   a|
|       1|  9| eight|   t|
|       1| 10|   ten|   t|
|       1| 11|eleven|   a|
+--------+---+------+----+

As you can see, even though the rows text="one" and text="two" appear in the same order, the .orderBy() swaps them around. Thus, we can assume the .orderBy() is not a stable sort.

Upvotes: 2

Related Questions