void
void

Reputation: 2543

PySpark: Best practice to add more columns to a DataFrame

Spark Dataframes has a method withColumn to add one new column at a time. To add multiple columns, a chain of withColumns are required. Is this the best practice to do this?

I feel that usingmapPartitions has more advantages. Let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions. It also helps if I have a database connection that I would prefer to open once per RDD partition.

My question has two parts.

The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

The second part, what are the tradeoffs in using mapPartitions over a chain of withColumn and filter?

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.

Upvotes: 4

Views: 4232

Answers (2)

Alper t. Turker
Alper t. Turker

Reputation: 35249

Are there any unforeseen issues with this approach?

Multiple. The most severe implications are:

  • A few times higher memory footprint to compared to plain DataFrame code and significant garbage collection overhead.
  • High cost of serialization and deserialization required to move data between execution contexts.
  • Introducing breaking point in the query planner.
  • As is, cost of schema inference on toDF call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.
  • And so on...

Some of these can be avoided with udf and select / withColumn, other cannot.

let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions

Your mapPartitions doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation

When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf, RDD or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD or skip Spark completely.

If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.

Upvotes: 6

Chitral Verma
Chitral Verma

Reputation: 2853

using df.withColumn() is the best way to add columns. they're all added lazily

Upvotes: -1

Related Questions