Reputation: 21
I have the following PySpark Input Dataframe:
+-------+------------+
| index | valuelist |
+-------+------------+
| 1.0 | [10,20,30] |
| 2.0 | [11,21,31] |
| 0.0 | [14,12,15] |
+-------+------------+
Where:
From the above Input Dataframe, I want to get the following Output Dataframe in PySpark
+-------+-------+
| index | value |
+-------+-------+
| 1.0 | 20 |
| 2.0 | 31 |
| 0.0 | 14 |
+-------+-------+
Logic:
for each row:
value = valuelist[index]
Upvotes: 2
Views: 5051
Reputation: 43504
You can use pyspark.sql.functions.expr
to pass a column value as an input to a function:
df.select("index", f.expr("valuelist[CAST(index AS integer)]").alias("value")).show()
#+-----+-----+
#|index|value|
#+-----+-----+
#| 1.0| 20|
#| 2.0| 31|
#| 0.0| 14|
#+-----+-----+
If you have spark
version 2.1 or higher, here's an alternative using pyspark.sql.functions.posexplode
:
import pyspark.sql.functions as f
df.select("index", f.posexplode("valuelist").alias("pos", "value"))\
.where(f.col("index").cast("int") == f.col("pos"))\
.select("index", "value")\
.show()
#+-----+-----+
#|index|value|
#+-----+-----+
#| 1.0| 20|
#| 2.0| 31|
#| 0.0| 14|
#+-----+-----+
Upvotes: 1
Reputation: 1058
You can create a new column and pass these two columns as an input.
from pyspark.sql import functions as F
columns = ['index', 'valuelist']
vals = [
(0, [1,2]),
(1, [1,2])
]
df = sqlContext.createDataFrame(vals, columns)
df = df.withColumn(
"value", udf(lambda index_and_list: index_and_list[0][index_and_list[1]], IntegerType())(
F.struct(F.col("valuelist"), F.col("index")))
)
Got the following output:
> +-----+---------+-----+
|index|valuelist|value|
+-----+---------+-----+
| 0| [1, 2]| 1|
| 1| [1, 2]| 2|
+-----+---------+-----+
Upvotes: 0