Marcel Braasch
Marcel Braasch

Reputation: 1183

How to reuse expression in withColumn

I want to filter long and lat coordinates by distance. I have the following expression.

.withColumn("a",
        // Haversine distance
        pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
          cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
            pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2))
      .withColumn("diff_b_dropoff_a_pickup", atan2(sqrt($"a"), sqrt(-$"a" + 1)) * 2 * 6371)
      .drop("a")
      .filter($"diff_b_pickup_a_dropoff" < lit(max_dist))

Now, I create a column, to further use that column for the next withColumn expression just to drop the first one and filter the second one. In other words, I touch things 4 times and I'm sure I could do this more simply. How can I rewrite this? The main issue I am having right now is that I can't (well I could, but then I compute "a" twice) compute the second withColumn as I need the first one twice..

Is it possible to reuse / or declare a variable such that I can use the second withColumn as my filter condition?

Thanks!

Upvotes: 1

Views: 1035

Answers (1)

m_vemuri
m_vemuri

Reputation: 792

One of the interesting things about the withColumn function is that it can take org.apache.spark.sql.Column expressions that result in a spark sql Column type. Thus, you can actually convert your multiple withColumn functions into just an expression of different Columns. In other words, multiple columns can be combined to make more complex column expressions.

Lets look at a simple example of squaring and then doubling a column in a dataframe:

// here is a piece of code which squares and doubles col_2 in (some) dataframe
// with an integer col_2, similar to your code.

val square = df.withColumn("square", $"col_2" * $"col_2")
val squareAndDouble = square.withColumn("square_and_double", $"square" * 2)
val res1 = squareAndDouble.drop("square")
res1.show()

+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
|    1|    1|                2|
|    2|    2|                8|

This can also be done using column expressions. This is because the withColumn function takes a column expression that results in a Column type (as mentioned above).

// Here's the signature of the withColumn function in spark-scala.
def withColumn(colName: String, col: Column): DataFrame

So we can write our square and double code as below:

// Again squaring and doubling col_2, but this time using column expressions.

val sqCol = $"col_2" * $"col_2"
val sqAndDoubleCol = sqCol * 2
val res2 = df.withColumn("square_and_double", sqAndDoubleCol)

res2.show()
+-----+-----+-----------------+
|col_1|col_2|square_and_double|
+-----+-----+-----------------+
|    1|    1|                2|
|    2|    2|                8|

Since your code isn’t actually using literals converted to a column (lit function of spark column type), I’m guessing the expression(s) calculating the haversine distance is returning a org.apache.spark.sql.Column type. Thus, we can re-write your code above as follows:

val haversineDistance : org.apache.spark.sql.Column = 
pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2) +
          cos(toRadians($"a_pickup_latitude")) * cos(toRadians($"b_dropoff_latitude")) *
            pow(sin(toRadians($"b_dropoff_latitude" - $"a_pickup_latitude") / 2), 2)

val dropoffDistance : org.apache.spark.sql.Column = atan2(sqrt(haversineDistance), sqrt(-haversineDistance + 1)) * 2 * 6371)

df.withColumn(“dropoffDistance”, dropoffDistance)
  .filter(…)
  :
  :

Note 1: if using the code above directly, I request you to check it. I am not well-versed in the actual domain logic and calculations. I am just trying to complete my answer and show the use of spark-scale code.

Note 2: The actual spark physical plan shouldn’t change in this case. You may save some time in the parsing and optimization though, since you are explicitly telling spark what to calculate. Hence, i wouldn't consider this as a way to optimize this code for performance. It will help in re-use and readability though, if you re-use the haversine distance.

Note 3: There is no way to check the column names and if they actually belong to the dataframe (even during compile time). This is a drawback of this API. The column name can be any string. It doesn’t check to associate it with a dataframe, until you actually run it. This is also mentioned in the spark api documentation:

col("columnName")           // A generic column no yet associated with a DataFrame.

So just be wary of that and double-check the column names. If you can, run some tests before pushing it to production.

Upvotes: 1

Related Questions