Reputation: 531
I am trying to figure out possible ways to process complex objects in pyspark. In the example below one of the columns of the dataframe is an array of integers. The processing is simply adding one to each value. Are these acceptable methods or there is a better practice?
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.enableHiveSupport().appName('learn').getOrCreate()
data = [('a', 1, [1, 3, 5]),
('b', 2, [4, 6, 9]),
('c', 3, [50, 60, 70, 80])]
df = spark.createDataFrame(data, ['nam', 'q', 'compl'])
# process complex object, method 1 using explode and collect_list (dataframe API)
res = df.withColumn('id', f.monotonically_increasing_id()).withColumn('compl_exploded', f.explode(f.col('compl')))
res = res.withColumn('compl_exploded', f.col('compl_exploded')+1)
res = res.groupby('id').agg(f.first('nam'), f.first('q'), f.collect_list('compl_exploded').alias('compl')).drop('id')
res.show()
# process complex object, method 2 using explode and collect_list (SQL)
df.withColumn('id', f.monotonically_increasing_id()).createOrReplaceTempView('tmp_view')
res = spark.sql("""
SELECT first(nam) AS nam, first(q) AS q, collect_list(compl_exploded+1) AS compl FROM (
SELECT *, explode(compl) AS compl_exploded FROM tmp_view
) x
GROUP BY id
""")
res.show()
# process complex object, method 3 using python UDF
from typing import List
def process(x: List[int]) -> List[int]:
return [_+1 for _ in x]
process_udf = f.udf(process, ArrayType(LongType()))
res = df.withColumn('compl', process_udf('compl'))
res.show()
Upvotes: 0
Views: 548
Reputation: 2655
For such operation you can take advantage of in build functions.
For e.g in your usecase you can use transform like below :
pyspark<=3.0
# Option 1
import pyspark.sql.functions as f
df.withColumn('add_one',f.expr('transform(compl, x -> x+1)')).show()
+---+---+----------------+----------------+
|nam| q| compl| add_one|
+---+---+----------------+----------------+
| a| 1| [1, 3, 5]| [2, 4, 6]|
| b| 2| [4, 6, 9]| [5, 7, 10]|
| c| 3|[50, 60, 70, 80]|[51, 61, 71, 81]|
+---+---+----------------+----------------+
# OR below options , all will give same output
# Option 2
df.select('nam', 'q', 'compl' , f.expr('transform(compl, x -> x+1) as add_one')).show()
# Option 3
df.createOrReplaceTempView('tmp_view')
spark.sql( 'select nam, q, compl , transform(compl, x -> x+1) as add_one from tmp_view').show()
pyspark>=3.1.0
If you are using newer version of spark then this function is easily available and you can use without expr
.
Upvotes: 1