Silvana
Silvana

Reputation: 65

Subtract Two Arrays to Get A New Array in Pyspark

I am new to Spark. I can sum, subtract or multiply arrays in python Pandas&Numpy. But I am having difficulty doing something similar in Spark (python). I am on Databricks.

For example this kind of approach is giving a huge error message which I don't want to copy paste here:

differencer=udf(lambda x,y: x-y, ArrayType(FloatType()))

df.withColumn('difference', differencer('Array1', 'Array2'))

Schema looks like this:

root
 |-- col1: integer (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- num: integer (nullable = true)
 |-- part: integer (nullable = true)
 |-- result: integer (nullable = true)
 |-- Array1: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- Array2: array (nullable = false)
 |    |-- element: float (containsNull = true)

I just want to create a new column subtracting those 2 array columns. Actually, I will get the RMSE between them. But I think I can handle it once I learn how to get this difference.

Arrays look like this(I am just typing in some integers):

Array1_row1[5, 4, 2, 4, 3] Array2_row1[4, 3, 1, 2, 1]

So the resulting array for row1 should be: DiffCol_row1[1, 1, 1, 2, 2]

Thanks for suggestions or giving directions. Thank you.

Upvotes: 2

Views: 5082

Answers (2)

malthe
malthe

Reputation: 1449

You can use zip_with (since Spark 2.4):

from pyspark.sql.functions import expr

df = spark.createDataFrame(
    [([5, 4, 2, 4, 3], [4, 3, 1, 2, 1])], ("array1", "array2")
) 

df.withColumn(
    "array3", 
    expr("zip_with(array1, array2, (x, y) -> x - y)")
).show()                                                                         
# +---------------+---------------+---------------+       
# |         array1|         array2|         array3|
# +---------------+---------------+---------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1, 1, 1, 2, 2]|
# +---------------+---------------+---------------+

Upvotes: 3

user11406020
user11406020

Reputation: 96

You can zip_arrays and transform

from pyspark.sql.functions import expr

df = spark.createDataFrame(
    [([5, 4, 2, 4, 3], [4, 3, 1, 2, 1])], ("array1", "array2")
) 

df.withColumn(
    "array3", 
    expr("transform(arrays_zip(array1, array2), x -> x.array1 - x.array2)")
).show()                                                                         
# +---------------+---------------+---------------+       
# |         array1|         array2|         array3|
# +---------------+---------------+---------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1, 1, 1, 2, 2]|
# +---------------+---------------+---------------+

A valid udf would require an equivalent logic, i.e.

from pyspark.sql.functions import udf

@udf("array<double>")
def differencer(xs, ys):
    if xs and ys:
        return [float(x - y) for x, y in zip(xs, ys)]

df.withColumn("array3", differencer("array1", "array2")).show()
# +---------------+---------------+--------------------+
# |         array1|         array2|              array3|
# +---------------+---------------+--------------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1.0, 1.0, 1.0, 2...|
# +---------------+---------------+--------------------+

Upvotes: 8

Related Questions