Reputation: 1945
I have a large PySpark DataFrame that I would like to manipulate as in the example below. I think it is easier to visualise it than to describe it. Hence, for illustrative purposes, let us take a simple DataFrame df
:
df.show()
+----------+-----------+-----------+
| series | timestamp | value |
+----------+-----------+-----------+
| ID1 | t1 | value1_1 |
| ID1 | t2 | value2_1 |
| ID1 | t3 | value3_1 |
| ID2 | t1 | value1_2 |
| ID2 | t2 | value2_2 |
| ID2 | t3 | value3_2 |
| ID3 | t1 | value1_3 |
| ID3 | t2 | value2_3 |
| ID3 | t3 | value3_3 |
+----------+-----------+-----------+
In the above DataFrame, each of the three unique values contained in column series
(i.e. ID1
, ID2
and ID3
) have corresponding values (under column values
) occurring simulaneously at the same time (i.e. same entries in column timestamp
).
From this DataFrame, I would like to have a transformation which ends up with the following DataFrame, named, say, results
. As it can be seen, the size of the DataFrame has changed and even the columns have been renamed according to entries of the original DataFrame.
result.show()
+-----------+-----------+-----------+-----------+
| timestamp | ID1 | ID2 | ID3 |
+-----------+-----------+-----------+-----------+
| t1 | value1_1 | value1_2 | value1_3 |
| t2 | value2_1 | value2_2 | value2_3 |
| t3 | value3_1 | value3_2 | value3_3 |
+-----------+-----------+-----------+-----------+
The order of the columns in result
is arbitrary and should not affect the final answer. This illustrative example only contains three unique values in series
(i.e. ID1
, ID2
and ID3
). Ideally, I would like to write a piece of code which automatically detects unique values in series
and therefore generates a new corresponding column. Does anyone know where can I start from? I have tried to group by timestamp
and then to collect a set of distinct series
and value
by using the aggregate function collect_set
but with no luck:(
Many thanks in advance!
Marioanzas
Upvotes: 1
Views: 709
Reputation: 1945
Extendind on mck's answer, I have found out a way of improving the pivot
performance. pivot
is a very expensive operation, hence, for Spark 2.0 on-wards, it is recommended to provide column data (if known) as an argument to the function as shown in the code below. This will improve the performance of the code for DataFrames much larger than the illustrative one posed in this question. Given that the values of series
are known beforehand, we can use:
import pyspark.sql.functions as F
series_list = ('ID1', 'ID2', 'ID3')
result = df.groupBy('timestamp').pivot('series', series_list).agg(F.first('value'))
result.show()
+---------+--------+--------+--------+
|timestamp| ID1| ID2| ID3|
+---------+--------+--------+--------+
| t1|value1_1|value1_2|value1_3|
| t2|value2_1|value2_2|value2_3|
| t3|value3_1|value3_2|value3_3|
+---------+--------+--------+--------+
Upvotes: 1
Reputation: 42422
Just a simple pivot:
import pyspark.sql.functions as F
result = df.groupBy('timestamp').pivot('series').agg(F.first('value'))
Make sure that each row in df
is distinct; otherwise duplicate entries may be silently deduplicated.
Upvotes: 2