Marioanzas
Marioanzas

Reputation: 1945

How to change the size and distribution of a PySpark Dataframe according to the values of its rows & columns?

I have a large PySpark DataFrame that I would like to manipulate as in the example below. I think it is easier to visualise it than to describe it. Hence, for illustrative purposes, let us take a simple DataFrame df:

df.show()
+----------+-----------+-----------+
|  series  | timestamp |   value   |
+----------+-----------+-----------+
|    ID1   |    t1     | value1_1  |
|    ID1   |    t2     | value2_1  |
|    ID1   |    t3     | value3_1  |
|    ID2   |    t1     | value1_2  |
|    ID2   |    t2     | value2_2  |
|    ID2   |    t3     | value3_2  |
|    ID3   |    t1     | value1_3  |
|    ID3   |    t2     | value2_3  |
|    ID3   |    t3     | value3_3  |
+----------+-----------+-----------+

In the above DataFrame, each of the three unique values contained in column series (i.e. ID1, ID2 and ID3) have corresponding values (under column values) occurring simulaneously at the same time (i.e. same entries in column timestamp).

From this DataFrame, I would like to have a transformation which ends up with the following DataFrame, named, say, results. As it can be seen, the size of the DataFrame has changed and even the columns have been renamed according to entries of the original DataFrame.

result.show()
+-----------+-----------+-----------+-----------+
| timestamp |    ID1    |    ID2    |    ID3    |
+-----------+-----------+-----------+-----------+
|    t1     |  value1_1 |  value1_2 |  value1_3 |
|    t2     |  value2_1 |  value2_2 |  value2_3 |
|    t3     |  value3_1 |  value3_2 |  value3_3 |
+-----------+-----------+-----------+-----------+

The order of the columns in result is arbitrary and should not affect the final answer. This illustrative example only contains three unique values in series (i.e. ID1, ID2 and ID3). Ideally, I would like to write a piece of code which automatically detects unique values in series and therefore generates a new corresponding column. Does anyone know where can I start from? I have tried to group by timestamp and then to collect a set of distinct series and value by using the aggregate function collect_set but with no luck:(

Many thanks in advance!

Marioanzas

Upvotes: 1

Views: 709

Answers (2)

Marioanzas
Marioanzas

Reputation: 1945

Extendind on mck's answer, I have found out a way of improving the pivot performance. pivot is a very expensive operation, hence, for Spark 2.0 on-wards, it is recommended to provide column data (if known) as an argument to the function as shown in the code below. This will improve the performance of the code for DataFrames much larger than the illustrative one posed in this question. Given that the values of series are known beforehand, we can use:

import pyspark.sql.functions as F

series_list = ('ID1', 'ID2', 'ID3')
result = df.groupBy('timestamp').pivot('series', series_list).agg(F.first('value'))
result.show()
+---------+--------+--------+--------+
|timestamp|     ID1|     ID2|     ID3|
+---------+--------+--------+--------+
|       t1|value1_1|value1_2|value1_3|
|       t2|value2_1|value2_2|value2_3|
|       t3|value3_1|value3_2|value3_3|
+---------+--------+--------+--------+

Upvotes: 1

mck
mck

Reputation: 42422

Just a simple pivot:

import pyspark.sql.functions as F

result = df.groupBy('timestamp').pivot('series').agg(F.first('value'))

Make sure that each row in df is distinct; otherwise duplicate entries may be silently deduplicated.

Upvotes: 2

Related Questions