Georg Heiler
Georg Heiler

Reputation: 17724

spark pivot without aggregation

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html explain nicely how a pivot works for spark.

In my python code, I use pandas without an aggregation but reset the index and join:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B'])
countryToMerge.index.name = 'ISO'
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')

How does this work in spark?

I tried to group and join manually like:

val grouped = countryKPI.groupBy("A").pivot("B")
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show

but that does not work. How would the reset_index fit into spark / How would it be implemented in a spark native way?

edit

a minimal example of the python code:

import pandas as pd
from datetime import datetime, timedelta
import numpy as np
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"])
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO'])
dates['ISO'] = isos.ISO
dates['ISO'] = dates['ISO'].astype("category")
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'],
                       'indicator_id':['a','a','b','b'],
                       'value':[7,8,9,7]})
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id'])
countryToMerge.index.name = 'ISO'
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner'))

  dates  ISO  a  b
0  2016-01-01  ABC  7  9
1  2016-01-03  ABC  7  9
2  2016-01-05  ABC  7  9
3  2016-01-07  ABC  7  9
4  2016-01-09  ABC  7  9
5  2016-01-02  POL  8  7
6  2016-01-04  POL  8  7
7  2016-01-06  POL  8  7
8  2016-01-08  POL  8  7
9  2016-01-10  POL  8  7

to follow along in scala / spark

val dates = Seq(("2016-01-01", "ABC"),
    ("2016-01-02", "ABC"),
    ("2016-01-03", "POL"),
    ("2016-01-04", "ABC"),
    ("2016-01-05", "POL"),
    ("2016-01-06", "ABC"),
    ("2016-01-07", "POL"),
    ("2016-01-08", "ABC"),
    ("2016-01-09", "POL"),
    ("2016-01-10", "ABC")
  ).toDF("dates", "ISO")
    .withColumn("dates", 'dates.cast("Date"))

  dates.show
  dates.printSchema

  val countryKPI = Seq(("ABC", "a", 7),
    ("ABC", "b", 8),
    ("POL", "a", 9),
    ("POL", "b", 7)
  ).toDF("country_id3", "indicator_id", "value")

  countryKPI.show
  countryKPI.printSchema

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id")

Upvotes: 15

Views: 24399

Answers (6)

Vic
Vic

Reputation: 662

Not sure if this is what you're looking for, but I had need to pivot values into columns and just note each value's existence in each column:

df.groupBy('A').pivot('B').agg(lit(1))

which produces 1 for existence and null otherwise. Curiously, specifying a string inside lit() did not work - all columns return with that value.

Upvotes: 0

Nuka Tejeswara Rao
Nuka Tejeswara Rao

Reputation: 271

import pyspark.sql.functions as F df_new=df.select('col1','col2','col3').groupBy('col1','col2').agg(F.concat_ws(",",F.collect_set("col3")))

Here, whichever column values you want to be in a single row, you can pass them in collect_set and I beleive it should work !

Upvotes: 0

Arash
Arash

Reputation: 552

In pyspark you can use the following:

Similar to what @Derek Kaknes mentioned above; Create a unique id column and then aggregate using sum or some other agg function. Make sure the columns you group by include the newly created id_column.

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id_column", monotonically_increasing_id())
groupby_columns = ["id_column"] + your_desired_columns
df = df.groupBy(groupby_columns).pivot(pivot_column).sum(value_column)

Upvotes: 0

shadow_dev
shadow_dev

Reputation: 130

This worked for me - not because I am backing up Georg Heiler's assertion regarding aggregation using "average." In order to apply pivot without aggregating you simply need to specify the groupBy terms with as much granularity as possible. If this condition is met, then you can use any aggregation term (avg, min, max... etc.).

  countryKPI.groupBy("country_id3").pivot("indicator_id").agg(avg("value").alias("value_term"))

Upvotes: 0

Derek Kaknes
Derek Kaknes

Reputation: 1011

There isn't a good way to pivot without aggregating in Spark, basically it assumes that you would just use a OneHotEncoder for that functionality, but that lacks the human readability of a straight pivot. The best ways that I have found to do it are:

val pivot = countryKPI
  .groupBy("country_id3", "value")
  .pivot("indicator_id", Seq("a", "b"))
  .agg(first(col("indicator_id")))

pivot.show
+-----------+-----+----+----+
|country_id3|value|   a|   b|
+-----------+-----+----+----+
|        ABC|    8|null|   b|
|        POL|    9|   a|null|
|        POL|    7|null|   b|
|        ABC|    7|   a|null|
+-----------+-----+----+----+

However, if (country_id3, value) is not distinct within the dataset, then you collapse rows and potentially be taking a somewhat meaningless first() value from your pivot col.

An alternative is to add an id column to the dataset, group on that new id, pivot your desired column, then join back to the original dataset. Here's an example:

val countryWithId = countryKPI.withColumn("id", monotonically_increasing_id)
val pivotted = countryWithId
.groupBy("id")
.pivot("indicator_id")
.agg(first(col("indicator_id")))

val pivot2 = countryWithId.join(pivotted, Seq("id")).drop("id") //.drop("indicator_id")

pivot2.show
+-----------+------------+-----+----+----+
|country_id3|indicator_id|value|   a|   b|
+-----------+------------+-----+----+----+
|        ABC|           a|    7|   a|null|
|        ABC|           b|    8|null|   b|
|        POL|           a|    9|   a|null|
|        POL|           b|    7|null|   b|
+-----------+------------+-----+----+----+

In this case, you still have the original pivot column, but you can .drop() that as well if you prefer.

Upvotes: 8

Georg Heiler
Georg Heiler

Reputation: 17724

The following snippet seems to work - but I am not sure if an aggregation by avg is correct -even though "fitting numbers" are the output.

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show

I'm not sure if this is "inefficient" for a bigger amount of data (avg) compared to just reusing the values (as I do not want to aggregate).

Upvotes: 3

Related Questions