Reputation: 2543
Spark Dataframes has a method withColumn
to add one new column at a time. To add multiple columns, a chain of withColumn
s are required. Is this the best practice to do this?
I feel that usingmapPartitions
has more advantages. Let's say I have a chain of three withColumn
s and then one filter to remove Row
s based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
. It also helps if I have a database connection that I would prefer to open once per RDD partition.
My question has two parts.
The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
The second part, what are the tradeoffs in using mapPartitions
over a chain of withColumn
and filter
?
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.
Upvotes: 4
Views: 4232
Reputation: 35249
Are there any unforeseen issues with this approach?
Multiple. The most severe implications are:
DataFrame
code and significant garbage collection overhead.toDF
call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.Some of these can be avoided with udf
and select
/ withColumn
, other cannot.
let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
Your mapPartitions
doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation
When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf
, RDD
or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD
or skip Spark completely.
If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.
Upvotes: 6
Reputation: 2853
using df.withColumn()
is the best way to add columns. they're all added lazily
Upvotes: -1