Hardik Gupta
Hardik Gupta

Reputation: 4790

Pyspark - Count length of new items

In my PySpark (< 2.4) dataframe, I have two list. I want to count new items in List1, which are not present in List2

data = [(("ID1", ['A', 'B'], ['A', 'C'])), (("ID2", ['A', 'B'], ['A', 'B'])), (("ID2", ['A', 'B'], None))]
df = spark.createDataFrame(data, ["ID", "List1", "List2"])
df.show(truncate=False)

+---+------+------+
|ID |List1 |List2 |
+---+------+------+
|ID1|[A, B]|[A, C]|
|ID2|[A, B]|[A, B]|
|ID2|[A, B]|null  |
+---+------+------+

Currently, I have written a UDF which is able to give me the answer. I am checking if I can do this without UDF.

Current solution

def sum_list(x, y):
    total = 0
    if y is None:
      total = 0

    elif x is None and y is not None:
      total = len(y)

    else:
      lst = [1 for item in y if item not in x]
      total = len(lst)

    return total

new_udf = udf(sum_list , IntegerType())
df = df.withColumn('new_count', new_udf('List2', 'List1'))
df.show()

+---+------+------+---------+
| ID| List1| List2|new_count|
+---+------+------+---------+
|ID1|[A, B]|[A, C]|        1|
|ID2|[A, B]|[A, B]|        0|
|ID2|[A, B]|  null|        2|
+---+------+------+---------+

Upvotes: 0

Views: 196

Answers (2)

Ala Tarighati
Ala Tarighati

Reputation: 3817

Using pyspark < 2.4, you can combine explode, groupby and array_contain:

df = df.select('ID', 'List1', 'List2', F.explode('List1').alias('list1_explode'))
df = df.groupby('ID', 'List1', 'List2').agg((F.sum(F.when(F.expr("array_contains(List2, list1_explode)"),0).otherwise(1))).alias('new_count'))
df.show()

+---+------+------+---------+
| ID| List1| List2|new_count|
+---+------+------+---------+
|ID2|[A, B]|[A, B]|        0|
|ID2|[A, B]|  null|        2|
|ID1|[A, B]|[A, C]|        1|
+---+------+------+---------+

Upvotes: 1

Sivasankar Boomarapu
Sivasankar Boomarapu

Reputation: 188

You can use array_except. But Spark >= 2.4.0.

from pyspark.sql import SparkSession
from pyspark.sql.functions import array_except

spark = SparkSession.builder.appName("test").getOrCreate()
data = [(("ID1", ['A', 'B'], ['A', 'C'])), (("ID2", ['A', 'B'], ['A', 'B'])), (("ID2", ['A', 'B'], None))]
df = spark.createDataFrame(data, ["ID", "List1", "List2"])
df.show()

df.withColumn('new_count', when(df.List2.isNull(), size(df.List1)).otherwise(size(array_except('List1', 'List2')))).show()

Upvotes: 0

Related Questions