hyperc54
hyperc54

Reputation: 315

Pyspark 1.6 - Aliasing columns after pivoting with multiple aggregates

I am currently trying to alias the columns I'm getting after pivoting on a value on a Pyspark dataframe. The problem here is that the columns names I'm putting in the alias call are not properly set.

A concrete example :

Starting from this dataframe :

import pyspark.sql.functions as func

df = sc.parallelize([
    (217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'),
    (217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'),
    (217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'),
    (854123, 100000005, 'A'), (854123, 100000007, 'A')
]).toDF(["user_id", "timestamp", "actions"])

which gives

+-------+--------------------+------------+
|user_id|     timestamp      |  actions   |
+-------+--------------------+------------+
| 217498|           100000001|    'A'     |
| 217498|           100000025|    'A'     |
| 217498|           100000124|    'A'     |
| 217498|           100000152|    'B'     |
| 217498|           100000165|    'C'     |
| 217498|           100000177|    'C'     |
| 217498|           100000182|    'A'     |
| 217498|           100000197|    'B'     |
| 217498|           100000210|    'B'     |
| 854123|           100000005|    'A'     |
| 854123|           100000007|    'A'     |

The problem is that calling

df = df.groupby('user_id')\
       .pivot('actions')\
       .agg(func.count('timestamp').alias('ts_count'),
            func.mean('timestamp').alias('ts_mean'))

gives the columns names

df.columns

['user_id',
 'A_(count(timestamp),mode=Complete,isDistinct=false) AS ts_count#4L',
 'A_(avg(timestamp),mode=Complete,isDistinct=false) AS ts_mean#5',
 'B_(count(timestamp),mode=Complete,isDistinct=false) AS ts_count#4L',
 'B_(avg(timestamp),mode=Complete,isDistinct=false) AS ts_mean#5',
 'C_(count(timestamp),mode=Complete,isDistinct=false) AS ts_count#4L',
 'C_(avg(timestamp),mode=Complete,isDistinct=false) AS ts_mean#5']

which are completely impractical.

I could clean my column names with the methods shown here - (regex) or here - (use of withColumnRenamed(). However these are workarounds that could easily break after an update.

To sum it up: How can I use the columns generated by the pivot without having to parse them ? (e.g. 'A_(count(timestamp),mode=Complete,isDistinct=false) AS ts_count#4L' generated name) ?

Any help would be appreciated ! Thanks

Upvotes: 8

Views: 3068

Answers (1)

karhershey
karhershey

Reputation: 84

This is happening because the column you are pivoting on doesn't have distinct values. This results in duplicate column names when the pivot occurs so spark gives it those column names to make them distinct. You need to group your pivot column before you pivot to make the values in the pivot column (actions) distinct.

Let me know if you need more help!

@hyperc54

Upvotes: 0

Related Questions