user13591820
user13591820

Reputation: 175

How to get mapped values in Pyspark?

I have a dataframe that I use to do all of my calculations that has an id column and a name column

id | name
1  | Alex
2  | Bob
3  | Chris
4  | Kevin

I do a bunch of operations and get their closest friends, which is a list of pairs of the form [id, score]

id | friends
1  | [[2, 49], [3, 15]]
2  | [[4, 61], [2, 49], [3, 4]]

How can I map this friend list to a list of names? Scores can be dropped now. Ideally it would look like

id | friends
1  | [Bob, Chris]
2  | [Kevin, Bob, Chris]

I was thinking some kind of join, but I'm confused how this would work since its a list

Upvotes: 3

Views: 105

Answers (2)

Som
Som

Reputation: 6323

Perhaps this is useful (written in scala)

Added both the option if dimension dataframe represents small and big dataset

1. Load the bothe dataframes

 val data1 =
      """
        |id | name
        |1  | Alex
        |2  | Bob
        |3  | Chris
        |4  | Kevin
      """.stripMargin

    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +---+-----+
      * |id |name |
      * +---+-----+
      * |1  |Alex |
      * |2  |Bob  |
      * |3  |Chris|
      * |4  |Kevin|
      * +---+-----+
      *
      * root
      * |-- id: integer (nullable = true)
      * |-- name: string (nullable = true)
      */

    val df2 =
      spark.sql(
        """
          |select id, friends from values
          | (1, array(named_struct('id', 2, 'score', 49), named_struct('id', 3, 'score', 15))),
          | (2, array(named_struct('id', 4, 'score', 61), named_struct('id', 2, 'score', 49), named_struct('id', 3,
          | 'score', 4)))
          | T(id, friends)
        """.stripMargin)
    df2.show(false)
    df2.printSchema()
    /**
      * +---+--------------------------+
      * |id |friends                   |
      * +---+--------------------------+
      * |1  |[[2, 49], [3, 15]]        |
      * |2  |[[4, 61], [2, 49], [3, 4]]|
      * +---+--------------------------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- friends: array (nullable = false)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- id: integer (nullable = false)
      * |    |    |-- score: integer (nullable = false)
      */

2. If dimension data is big


    // if df1 has big data
    val exploded = df2.select($"id", explode(expr("friends.id")).as("friend_id"))
      exploded.join(df1, exploded("friend_id")===df1("id"))
      .groupBy(exploded("id"))
      .agg(collect_list($"name").as("friends"))
      .show(false)
    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |2  |[Bob, Chris, Kevin]|
      * |1  |[Bob, Chris]       |
      * +---+-------------------+
      */

3. If dimension dataframe is small

    // if df1 is small
    val b = spark.sparkContext.broadcast(df1.collect().map{case Row(id: Int, name: String) => id -> name}.toMap)

    val getFriendsName = udf((idArray: mutable.WrappedArray[Int]) => idArray.map(b.value(_)))

    df2.withColumn("friends", getFriendsName(expr("friends.id")))
      .show(false)

    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |1  |[Bob, Chris]       |
      * |2  |[Kevin, Bob, Chris]|
      * +---+-------------------+
      */

Upvotes: 2

Shubham Jain
Shubham Jain

Reputation: 5526

Have a look at this

df = spark.createDataFrame([('1' , 'Alex'), ('2' , 'Bob'), ('3' , 'Chris'), ('4' , 'Kevin')],['id' , 'name'])
df2 = spark.createDataFrame([('1',[[2, 49], [3, 15]]), ('2',[[4, 61], [2, 49], [3, 4]])], ['id' , 'friends'])

df3 = df2.select('id','friends' ,f.expr('''explode(transform(friends,x->x[0])) as friend'''))

df3.join(df,df.id.cast('int')==df3.friend.cast('int')).groupBy(df3.id,df3.friends).agg(f.collect_list('name').alias('friend')).show()

+---+--------------------+-------------------+
| id|             friends|             friend|
+---+--------------------+-------------------+
|  1|  [[2, 49], [3, 15]]|       [Chris, Bob]|
|  2|[[4, 61], [2, 49]...|[Chris, Kevin, Bob]|
+---+--------------------+-------------------+

Upvotes: 1

Related Questions