Reputation: 63
I'm learning Spark and I came across problem that I'm unable to overcome.. What I would like to achieve is to get number of elements with the same value for 2 arrays on the same positions. I'm able to get what I want via Python UDF but I'm wondering if there is a way using only Spark functions.
df_bits = sqlContext.createDataFrame([[[0, 1, 1, 0, 0, ],
[1, 1, 1, 0, 1, ],
]],['bits1', 'bits2'])
df_bits_with_result = df_bits.select('bits1', 'bits2', some_magic('bits1', 'bits2').show()
+--------------------+--------------------+---------------------------------+
|bits1 |bits2 |some_magic(bits1, bits2)|
+--------------------+--------------------+---------------------------------+
|[0, 1, 1, 0, 1, ] |[1, 1, 1, 0, 0, ] |3 |
+--------------------+--------------------+---------------------------------+
Why 3? bits1[1] == bits2[1] AND bits1[2] == bits2[2] AND bits1[3] == bits2[3]
I tried to play with rdd.reduce but with no luck.
Upvotes: 4
Views: 377
Reputation: 8410
Pyspark use arrays_zip
as mentioned in comments
from pyspark.sql import functions as F
df_bits.withColumn("sum", \
F.expr("""aggregate(arrays_zip(bits1,bits2),0,(acc,x)-> IF(x.bits1==x.bits2,1,0)+acc)""")).show()
#+---------------+---------------+---+
#| bits1| bits2|sum|
#+---------------+---------------+---+
#|[0, 1, 1, 0, 0]|[1, 1, 1, 0, 1]| 3|
#+---------------+---------------+---+
Upvotes: 3
Reputation: 6338
Perhaps this is helpful-
spark>=2.4
Use aggregate
and zip_with
val df = spark.sql("select array(0, 1, 1, 0, 0, null) as bits1, array(1, 1, 1, 0, 1, null) as bits2")
df.show(false)
df.printSchema()
/**
* +----------------+----------------+
* |bits1 |bits2 |
* +----------------+----------------+
* |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|
* +----------------+----------------+
*
* root
* |-- bits1: array (nullable = false)
* | |-- element: integer (containsNull = true)
* |-- bits2: array (nullable = false)
* | |-- element: integer (containsNull = true)
*/
df.withColumn("x", expr("aggregate(zip_with(bits1, bits2, (x, y) -> if(x=y, 1, 0)), 0, (acc, x) -> acc + x)"))
.show(false)
/**
* +----------------+----------------+---+
* |bits1 |bits2 |x |
* +----------------+----------------+---+
* |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|3 |
* +----------------+----------------+---+
*/
Upvotes: 3