constructor
constructor

Reputation: 1410

Convert dataframe: several columns to single by order

I'm using Spark 2.1.1 and dataframe. Here is my input dataframe:

+----+---------+---------+-------+
| key|parameter|reference| subkey|
+----+---------+---------+-------+
|key1|       45|       10|subkey1|
|key1|       45|       20|subkey2|
|key2|       70|       40|subkey2|
|key2|       70|       30|subkey1|
+----+---------+---------+-------+

I need convert the data frame to next:

result data (by pandas):
+-----+-----------+
|label|   features|
+-----+-----------+
|   45|[10.0,20.0]|
|   70|[30.0,40.0]|
+-----+-----------+

I can do the transformation with help of pandas:

def convert_to_flat_by_pandas(df):
    pandas_data_frame = df.toPandas()
    all_keys = pandas_data_frame['key'].unique()

    flat_values = []
    for key in all_keys:
        key_rows = pandas_data_frame.loc[pandas_data_frame['key'] == key]
        key_rows = key_rows.sort_values(by=['subkey'])

        parameter_values = key_rows['parameter']
        parameter_value = parameter_values.real[0]        

        key_reference_value = [reference_values for reference_values in key_rows['reference']]

        flat_values.append((parameter_value, key_reference_value))

    loaded_data = [(label, Vectors.dense(features)) for (label, features) in flat_values]
    spark_df = spark.createDataFrame(loaded_data, ["label", "features"])

    return spark_df

Seems, I need use GroupBy, but I don't understand how sort and convert group (several rows) to single row.

Source of working sample (with help of pandas): https://github.com/constructor-igor/TechSugar/blob/master/pythonSamples/pysparkSamples/df_flat.py

With help of 2 answers I get 2 possible solutions:

UPD1 Solution #1

def convert_to_flat_by_sparkpy(df):
    subkeys = df.select("subkey").dropDuplicates().collect()
    subkeys = [s[0] for s in subkeys]
    print('subkeys: ', subkeys)
    assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
    spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))    
    spark_df = spark_df.withColumnRenamed("parameter", "label")
    spark_df = spark_df.select("label", "features")
    return spark_df

UPD1 Solution #2

def convert_to_flat_by_sparkpy_v2(df):
    spark_df = df.orderBy("subkey")
    spark_df = spark_df.groupBy("key").agg(first(col("parameter")).alias("label"), collect_list("reference").alias("features"))
    spark_df = spark_df.select("label", "features")
    return spark_df

Upvotes: 2

Views: 924

Answers (2)

akuiper
akuiper

Reputation: 214957

For the limited sample data you have given, you can transform the data frame to wide format with the subkey as headers, and then use VectorAssembler to collect them as features:

from pyspark.sql.functions import first, col
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler().setInputCols(["subkey1", "subkey2"]).setOutputCol("features")

assembler.transform(
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference")))
).show()
+----+---------+-------+-------+-----------+
| key|parameter|subkey1|subkey2|   features|
+----+---------+-------+-------+-----------+
|key1|       45|     10|     20|[10.0,20.0]|
|key2|       70|     30|     40|[30.0,40.0]|
+----+---------+-------+-------+-----------+

Update for dynamic subkeys:

Say if you have a data frame like this:

df.show()
+----+---------+---------+-------+    
| key|parameter|reference| subkey|
+----+---------+---------+-------+
|key1|       45|       10|subkey1|
|key1|       45|       20|subkey2|
|key2|       70|       40|subkey2|
|key2|       70|       30|subkey1|
|key2|       70|       70|subkey3|
+----+---------+---------+-------+

Collect all unique sub keys firstly, and then use the subkeys to create the assembler:

subkeys = df.select("subkey").dropDuplicates().rdd.map(lambda r: r[0]).collect()
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")

assembler.transform(    
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))).na.fill(0)
).show()
+----+---------+-------+-------+-------+----------------+
| key|parameter|subkey1|subkey2|subkey3|        features|
+----+---------+-------+-------+-------+----------------+
|key1|       45|     10|     20|      0| [20.0,10.0,0.0]|
|key2|       70|     30|     40|     70|[40.0,30.0,70.0]|
+----+---------+-------+-------+-------+----------------+

Upvotes: 2

koiralo
koiralo

Reputation: 23109

You can use groupby and collect_list function to obtain the output

import org.apache.spark.sql.functions._

df.groupBy("parameter").agg(collect_list("reference").alias("features"))

df1.withColumnRenamed("parameter", "label")

Output:

+---------+--------+
|parameter|features|
+---------+--------+
|       45|[10, 20]|
|       70|[40, 30]|
+---------+--------+

Hope this helps!

Upvotes: 2

Related Questions