Reputation: 65
I am new to Spark. I can sum, subtract or multiply arrays in python Pandas&Numpy. But I am having difficulty doing something similar in Spark (python). I am on Databricks.
For example this kind of approach is giving a huge error message which I don't want to copy paste here:
differencer=udf(lambda x,y: x-y, ArrayType(FloatType()))
df.withColumn('difference', differencer('Array1', 'Array2'))
Schema looks like this:
root
|-- col1: integer (nullable = true)
|-- time: timestamp (nullable = true)
|-- num: integer (nullable = true)
|-- part: integer (nullable = true)
|-- result: integer (nullable = true)
|-- Array1: array (nullable = true)
| |-- element: float (containsNull = true)
|-- Array2: array (nullable = false)
| |-- element: float (containsNull = true)
I just want to create a new column subtracting those 2 array columns. Actually, I will get the RMSE
between them. But I think I can handle it once I learn how to get this difference.
Arrays look like this(I am just typing in some integers):
Array1_row1[5, 4, 2, 4, 3]
Array2_row1[4, 3, 1, 2, 1]
So the resulting array for row1 should be:
DiffCol_row1[1, 1, 1, 2, 2]
Thanks for suggestions or giving directions. Thank you.
Upvotes: 2
Views: 5082
Reputation: 1449
You can use zip_with
(since Spark 2.4):
from pyspark.sql.functions import expr
df = spark.createDataFrame(
[([5, 4, 2, 4, 3], [4, 3, 1, 2, 1])], ("array1", "array2")
)
df.withColumn(
"array3",
expr("zip_with(array1, array2, (x, y) -> x - y)")
).show()
# +---------------+---------------+---------------+
# | array1| array2| array3|
# +---------------+---------------+---------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1, 1, 1, 2, 2]|
# +---------------+---------------+---------------+
Upvotes: 3
Reputation: 96
You can zip_arrays
and transform
from pyspark.sql.functions import expr
df = spark.createDataFrame(
[([5, 4, 2, 4, 3], [4, 3, 1, 2, 1])], ("array1", "array2")
)
df.withColumn(
"array3",
expr("transform(arrays_zip(array1, array2), x -> x.array1 - x.array2)")
).show()
# +---------------+---------------+---------------+
# | array1| array2| array3|
# +---------------+---------------+---------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1, 1, 1, 2, 2]|
# +---------------+---------------+---------------+
A valid udf
would require an equivalent logic, i.e.
from pyspark.sql.functions import udf
@udf("array<double>")
def differencer(xs, ys):
if xs and ys:
return [float(x - y) for x, y in zip(xs, ys)]
df.withColumn("array3", differencer("array1", "array2")).show()
# +---------------+---------------+--------------------+
# | array1| array2| array3|
# +---------------+---------------+--------------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1.0, 1.0, 1.0, 2...|
# +---------------+---------------+--------------------+
Upvotes: 8