Reputation: 83
I have two DataFrames:
df1=
+---+----------+
| id|filter |
+---+----------+
| 1| YES|
| 2| NO|
| 3| NO|
+---+----------+
df2 =
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|XXXXXX |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|YYYYYY |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
What I want to do is to create a new column in df1, filtering a field name in df2 based in a row value of df1. My output would be like this:
df3 =
+---+----------+----------------+
| id|filter | value |
+---+----------+----------------+
| 1| YES|[XXXXXX, YYYYYY]|
| 2| NO| [] |
| 3| NO| [] |
+---+----------+----------------+
I know how to it with Pandas, but I don't know how to do it with PySpark.
I've tried the following, but it doesn't seem to work:
df3 = df1.withColumn('value', f.when(df1['filter'] == 'YES', df2.select(f.col('id')).collect()).otherwise(f.lit([]))
Thank you very much
Upvotes: 0
Views: 1279
Reputation: 6338
val data1 =
"""
| id|filter
| 1| YES
| 2| NO
| 3| NO
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.printSchema()
df1.show(false)
/**
* root
* |-- id: integer (nullable = true)
* |-- filter: string (nullable = true)
*
* +---+------+
* |id |filter|
* +---+------+
* |1 |YES |
* |2 |NO |
* |3 |NO |
* +---+------+
*/
val data2 =
"""
| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15
|XXXXXX |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
|YYYYYY |NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN
""".stripMargin
val stringDS2 = data2.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS2)
df2.printSchema()
df2.show(false)
/**
* root
* |-- 1: string (nullable = true)
* |-- 2: double (nullable = true)
* |-- 3: double (nullable = true)
* |-- 4: double (nullable = true)
* |-- 5: double (nullable = true)
* |-- 6: double (nullable = true)
* |-- 7: double (nullable = true)
* |-- 8: double (nullable = true)
* |-- 9: double (nullable = true)
* |-- 10: double (nullable = true)
* |-- 11: double (nullable = true)
* |-- 12: double (nullable = true)
* |-- 13: double (nullable = true)
* |-- 14: double (nullable = true)
* |-- 15: double (nullable = true)
*
* +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
* |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |
* +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
* |XXXXXX|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
* |YYYYYY|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|NaN|
* +------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
*/
val stringCol = df2.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = df2.selectExpr(s"stack(${df2.columns.length}, $stringCol) as (id, value)")
processedDF.show(false)
/**
* +---+------+
* |id |value |
* +---+------+
* |1 |XXXXXX|
* |2 |NaN |
* |3 |NaN |
* |4 |NaN |
* |5 |NaN |
* |6 |NaN |
* |7 |NaN |
* |8 |NaN |
* |9 |NaN |
* |10 |NaN |
* |11 |NaN |
* |12 |NaN |
* |13 |NaN |
* |14 |NaN |
* |15 |NaN |
* |1 |YYYYYY|
* |2 |NaN |
* |3 |NaN |
* |4 |NaN |
* |5 |NaN |
* +---+------+
* only showing top 20 rows
*/
df1.join(processedDF, "id")
.groupBy("id", "filter")
.agg(collect_list("value").as("value"))
.selectExpr("id", "filter", "FILTER(value, x -> x != 'NaN') as value")
.show(false)
/**
* +---+------+----------------+
* |id |filter|value |
* +---+------+----------------+
* |2 |NO |[] |
* |1 |YES |[XXXXXX, YYYYYY]|
* |3 |NO |[] |
* +---+------+----------------+
*/
Upvotes: 3