Reputation: 175
I have a dataframe that I use to do all of my calculations that has an id column and a name column
id | name
1 | Alex
2 | Bob
3 | Chris
4 | Kevin
I do a bunch of operations and get their closest friends, which is a list of pairs of the form [id, score]
id | friends
1 | [[2, 49], [3, 15]]
2 | [[4, 61], [2, 49], [3, 4]]
How can I map this friend list to a list of names? Scores can be dropped now. Ideally it would look like
id | friends
1 | [Bob, Chris]
2 | [Kevin, Bob, Chris]
I was thinking some kind of join, but I'm confused how this would work since its a list
Upvotes: 3
Views: 105
Reputation: 6323
Perhaps this is useful (written in scala)
Added both the option if dimension dataframe represents small and big dataset
val data1 =
"""
|id | name
|1 | Alex
|2 | Bob
|3 | Chris
|4 | Kevin
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +---+-----+
* |id |name |
* +---+-----+
* |1 |Alex |
* |2 |Bob |
* |3 |Chris|
* |4 |Kevin|
* +---+-----+
*
* root
* |-- id: integer (nullable = true)
* |-- name: string (nullable = true)
*/
val df2 =
spark.sql(
"""
|select id, friends from values
| (1, array(named_struct('id', 2, 'score', 49), named_struct('id', 3, 'score', 15))),
| (2, array(named_struct('id', 4, 'score', 61), named_struct('id', 2, 'score', 49), named_struct('id', 3,
| 'score', 4)))
| T(id, friends)
""".stripMargin)
df2.show(false)
df2.printSchema()
/**
* +---+--------------------------+
* |id |friends |
* +---+--------------------------+
* |1 |[[2, 49], [3, 15]] |
* |2 |[[4, 61], [2, 49], [3, 4]]|
* +---+--------------------------+
*
* root
* |-- id: integer (nullable = false)
* |-- friends: array (nullable = false)
* | |-- element: struct (containsNull = false)
* | | |-- id: integer (nullable = false)
* | | |-- score: integer (nullable = false)
*/
// if df1 has big data
val exploded = df2.select($"id", explode(expr("friends.id")).as("friend_id"))
exploded.join(df1, exploded("friend_id")===df1("id"))
.groupBy(exploded("id"))
.agg(collect_list($"name").as("friends"))
.show(false)
/**
* +---+-------------------+
* |id |friends |
* +---+-------------------+
* |2 |[Bob, Chris, Kevin]|
* |1 |[Bob, Chris] |
* +---+-------------------+
*/
// if df1 is small
val b = spark.sparkContext.broadcast(df1.collect().map{case Row(id: Int, name: String) => id -> name}.toMap)
val getFriendsName = udf((idArray: mutable.WrappedArray[Int]) => idArray.map(b.value(_)))
df2.withColumn("friends", getFriendsName(expr("friends.id")))
.show(false)
/**
* +---+-------------------+
* |id |friends |
* +---+-------------------+
* |1 |[Bob, Chris] |
* |2 |[Kevin, Bob, Chris]|
* +---+-------------------+
*/
Upvotes: 2
Reputation: 5526
Have a look at this
df = spark.createDataFrame([('1' , 'Alex'), ('2' , 'Bob'), ('3' , 'Chris'), ('4' , 'Kevin')],['id' , 'name'])
df2 = spark.createDataFrame([('1',[[2, 49], [3, 15]]), ('2',[[4, 61], [2, 49], [3, 4]])], ['id' , 'friends'])
df3 = df2.select('id','friends' ,f.expr('''explode(transform(friends,x->x[0])) as friend'''))
df3.join(df,df.id.cast('int')==df3.friend.cast('int')).groupBy(df3.id,df3.friends).agg(f.collect_list('name').alias('friend')).show()
+---+--------------------+-------------------+
| id| friends| friend|
+---+--------------------+-------------------+
| 1| [[2, 49], [3, 15]]| [Chris, Bob]|
| 2|[[4, 61], [2, 49]...|[Chris, Kevin, Bob]|
+---+--------------------+-------------------+
Upvotes: 1