Reputation: 4136
I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.
That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)'amount','trans_date').withColumn("test", test_udf("amount")).show(4)
That produces the following:
|amount|trans_date| test|
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
only showing top 5 rows
What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:'amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
Upvotes: 68
Views: 53565
Reputation: 330353
It is not possible to create multiple top level columns from a single UDF call but you can create a new struct
. It requires an UDF with specified returnType
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType
schema = StructType([
StructField("foo", FloatType(), False),
StructField("bar", FloatType(), False)
def udf_test(n):
return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))
test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])
foobars ="y").alias("foobar"))
## root
## |-- foobar: struct (nullable = true)
## | |-- foo: float (nullable = false)
## | |-- bar: float (nullable = false)
You further flatten the schema with simple select
:"", "").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+
See also Derive multiple columns from a single column in a Spark DataFrame
Upvotes: 105
Reputation: 476
you can use flatMap to get the column the desired dataframe in one go
df=df.withColumn('udf_results',udf)'udf_results').rdd.flatMap(lambda x:x).toDF(schema=your_new_schema)
Upvotes: 2