martina.physics
martina.physics

Reputation: 9804

Adding column to PySpark DataFrame depending on whether column value is in another column

I have a PySpark DataFrame with structure given by

[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')

I need to add a further column with 1 or 0 depending on whether 'item' is in 'fav_items' or not.

So I would want

[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]

How would I look up for second column into third column to decide value and how would I then add it?

Upvotes: 3

Views: 9065

Answers (2)

Hugo Reyes
Hugo Reyes

Reputation: 1585

The following code does the requested task. An user defined function was defined that receives two columns of a DataFrame as parameters. So, for each row, search if an item is in the item list. If the item is found, a 1 is return, otherwise a 0.

# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df.show()

# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())

df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()

Here the results:

+----+----+---------+
|user|item|fav_items|
+----+----+---------+
|  u1|   1|[1, 2, 3]|
|  u1|   4|[1, 2, 3]|
+----+----+---------+

+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
|  u1|   1|[1, 2, 3]|     1|
|  u1|   4|[1, 2, 3]|     0|
+----+----+---------+------+

Upvotes: 8

zero323
zero323

Reputation: 330113

Just for fun non-UDF solution:

from pyspark.sql.functions import col, first, explode, max as max_

result = (
    # Here we take exploded rows and for each row check if there
    # is a match. We cast to integer (false -> 0, true -> 1)
    # and take max (1 if there is any match)
    max_((col("fav_item") == col("item")).cast("integer"))
).alias("result")


(df.repartition("user", "item") 
  # Explode array so we compare item and fav_item
  .withColumn("fav_item", explode("fav_items")) 
  .groupBy("user", "item")
  # Aggregate
  # we add result and retain fav_items
  .agg(result, first("fav_items").alias("fav_items")))

So it just:

  • unrolls fav_array:

    ## +----+----+---------+--------+
    ## |user|item|fav_items|fav_item|
    ## +----+----+---------+--------+
    ## |  u1|   1|[1, 2, 3]|       1|
    ## |  u1|   1|[1, 2, 3]|       2|
    ## |  u1|   1|[1, 2, 3]|       3|
    ## |  u1|   4|[1, 2, 3]|       1|
    ## |  u1|   4|[1, 2, 3]|       2|
    ## |  u1|   4|[1, 2, 3]|       3|
    ## +----+----+---------+--------+
    
  • checks if fav_item = item (_1 is a result of (col("fav_item") == col("item")).cast("integer") expression):

    ## +----+----+---------+--------+---+
    ## |user|item|fav_items|fav_item| _1|
    ## +----+----+---------+--------+---+
    ## |  u1|   1|[1, 2, 3]|       1|  1|
    ## |  u1|   1|[1, 2, 3]|       2|  0|
    ## |  u1|   1|[1, 2, 3]|       3|  0|
    ## |  u1|   4|[1, 2, 3]|       1|  0|
    ## |  u1|   4|[1, 2, 3]|       2|  0|
    ## |  u1|   4|[1, 2, 3]|       3|  0|
    ## +----+----+---------+--------+---+
    
  • and rolls it back keeping user and item as group columns, an arbitrary fav_items (all are the same) and maximum of temporary column _1 (0 or 1).

I would go with UDF though.

Upvotes: 3

Related Questions