Shankar
Shankar

Reputation: 8967

using functools reduce on Distributed Spark DataFrame

I'm trying to add list of columns into existing Spark DataFrame.

Example code:

columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()

This is giving the expected results.

Does using reduce() on Distributed Spark DataFrame will try to execute everything on Single Node?

Upvotes: 1

Views: 3437

Answers (1)

CPak
CPak

Reputation: 13591

OP asked

Does using reduce() on Distributed Spark DataFrame will try to execute everything on Single Node?

But I think what OP really wants to know is

whether the following commands are different from an Spark execution standpoint?

Generate toy data

data = [
    ('1',),
    ('2',),
    ('3',),
    ('4',),
]
df = spark.createDataFrame(data, ['id'])

You can see the execution plan of your code using .explain()

Scenario 1 (using functools.reduce)

from functools import reduce
from pyspark.sql.functions import col, lit
columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()
result1 = reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df)
result1.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#122, NULL AS col2#125, NULL AS col3#129, NULL AS col4#134]
+- Scan ExistingRDD[id#0]

Scenario 2 (@anky's code using select and list comprehension)

result2 = df.select("*",*[lit('NULL').alias(i) for i in columns_list])
result2.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#140, NULL AS col2#141, NULL AS col3#142, NULL AS col4#143]
+- Scan ExistingRDD[id#0]

Scenario 3 (using for loop and iterative assignment)

result3 = df
for i in columns_list:
    result3 = result3.withColumn(i, lit('NULL'))

result3.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#167, NULL AS col2#170, NULL AS col3#174, NULL AS col4#179]
+- Scan ExistingRDD[id#0]

Note that Scenario 3 does not work in 'base' Python (why functools.reduce() is necessary). OP, I suggest reading about the differences between Transformations and Actions in Spark. Spark generates a 'Plan' of execution first, which is why Reduce() is not required.

Upvotes: 2

Related Questions