Reputation: 1410
I'm using Spark 2.1.1 and dataframe. Here is my input dataframe:
+----+---------+---------+-------+
| key|parameter|reference| subkey|
+----+---------+---------+-------+
|key1| 45| 10|subkey1|
|key1| 45| 20|subkey2|
|key2| 70| 40|subkey2|
|key2| 70| 30|subkey1|
+----+---------+---------+-------+
I need convert the data frame to next:
result data (by pandas):
+-----+-----------+
|label| features|
+-----+-----------+
| 45|[10.0,20.0]|
| 70|[30.0,40.0]|
+-----+-----------+
I can do the transformation with help of pandas:
def convert_to_flat_by_pandas(df):
pandas_data_frame = df.toPandas()
all_keys = pandas_data_frame['key'].unique()
flat_values = []
for key in all_keys:
key_rows = pandas_data_frame.loc[pandas_data_frame['key'] == key]
key_rows = key_rows.sort_values(by=['subkey'])
parameter_values = key_rows['parameter']
parameter_value = parameter_values.real[0]
key_reference_value = [reference_values for reference_values in key_rows['reference']]
flat_values.append((parameter_value, key_reference_value))
loaded_data = [(label, Vectors.dense(features)) for (label, features) in flat_values]
spark_df = spark.createDataFrame(loaded_data, ["label", "features"])
return spark_df
Seems, I need use GroupBy, but I don't understand how sort and convert group (several rows) to single row.
Source of working sample (with help of pandas): https://github.com/constructor-igor/TechSugar/blob/master/pythonSamples/pysparkSamples/df_flat.py
With help of 2 answers I get 2 possible solutions:
UPD1 Solution #1
def convert_to_flat_by_sparkpy(df):
subkeys = df.select("subkey").dropDuplicates().collect()
subkeys = [s[0] for s in subkeys]
print('subkeys: ', subkeys)
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))
spark_df = spark_df.withColumnRenamed("parameter", "label")
spark_df = spark_df.select("label", "features")
return spark_df
UPD1 Solution #2
def convert_to_flat_by_sparkpy_v2(df):
spark_df = df.orderBy("subkey")
spark_df = spark_df.groupBy("key").agg(first(col("parameter")).alias("label"), collect_list("reference").alias("features"))
spark_df = spark_df.select("label", "features")
return spark_df
Upvotes: 2
Views: 924
Reputation: 214957
For the limited sample data you have given, you can transform the data frame to wide format with the subkey as headers, and then use VectorAssembler
to collect them as features:
from pyspark.sql.functions import first, col
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler().setInputCols(["subkey1", "subkey2"]).setOutputCol("features")
assembler.transform(
df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference")))
).show()
+----+---------+-------+-------+-----------+
| key|parameter|subkey1|subkey2| features|
+----+---------+-------+-------+-----------+
|key1| 45| 10| 20|[10.0,20.0]|
|key2| 70| 30| 40|[30.0,40.0]|
+----+---------+-------+-------+-----------+
Update for dynamic subkeys:
Say if you have a data frame like this:
df.show()
+----+---------+---------+-------+
| key|parameter|reference| subkey|
+----+---------+---------+-------+
|key1| 45| 10|subkey1|
|key1| 45| 20|subkey2|
|key2| 70| 40|subkey2|
|key2| 70| 30|subkey1|
|key2| 70| 70|subkey3|
+----+---------+---------+-------+
Collect all unique sub keys firstly, and then use the subkeys to create the assembler:
subkeys = df.select("subkey").dropDuplicates().rdd.map(lambda r: r[0]).collect()
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
assembler.transform(
df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))).na.fill(0)
).show()
+----+---------+-------+-------+-------+----------------+
| key|parameter|subkey1|subkey2|subkey3| features|
+----+---------+-------+-------+-------+----------------+
|key1| 45| 10| 20| 0| [20.0,10.0,0.0]|
|key2| 70| 30| 40| 70|[40.0,30.0,70.0]|
+----+---------+-------+-------+-------+----------------+
Upvotes: 2
Reputation: 23109
You can use groupby and collect_list function to obtain the output
import org.apache.spark.sql.functions._
df.groupBy("parameter").agg(collect_list("reference").alias("features"))
df1.withColumnRenamed("parameter", "label")
Output:
+---------+--------+
|parameter|features|
+---------+--------+
| 45|[10, 20]|
| 70|[40, 30]|
+---------+--------+
Hope this helps!
Upvotes: 2