Reputation: 883
NOTE: This is just a quick example data. Won't make sense when compared to an actual cricket team.
I have a JSON file as shown below:
{
"someID": "a5cf4922f4e3f45",
"payload": {
"teamID": "1",
"players": [
{
"type": "Batsman",
"name": "Amar",
"address": {
"state": "Gujarat"
}
},
{
"type": "Bowler",
"name": "Akbar",
"address": {
"state": "Telangana"
}
},
{
"type": "Fielder",
"name": "Antony",
"address": {
"state": "Kerala"
}
}
]
}
}
I have exploded this with the below code:
df_record = spark.read.json("path-to-file.json",multiLine=True)
df_player_dtls = df_record.select("payload.teamID", explode("payload.players").alias("xplayers")) \
.select("teamID", \
"xplayers.type", \
"xplayers.name", \
"xplayers.address.state")
df_player_dtls.createOrReplaceTempView("t_player_dtls")
spark.sql("SELECT * FROM t_player_dtls").show()
So the currently the output looks like :
+--------+---------+--------+------------+
| TeamID | Type | Name | State |
+--------+---------+--------+------------+
| 1 | Batsman | Amar | Gujarat |
| 1 | Bowler | Akbar | Telangana |
| 1 | Fielder | Antony | Kerala |
| 2 | Batsman | John | Queensland |
| 2 | Bowler | Smith | Perth |
+--------+---------+--------+------------+
I want to convert it to the below shown format:
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| TeamID | Batsman.Name | Batsman.State | Bowler.Name | Bowler.State | Fielder.Name | Fielder.State |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| 1 | Amar | Gujarat | Akbar | Telangana | Antony | Kerala |
| 2 | John | Queensland | Smith | Perth | null | null |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
There will only be one player of each type in a team and there can be at most four types of players in each team (Batsman, Bowler, Fielder, and Wicketkeeper). So the maximum number of players in each team is four. Hence, the final table that will hold this data has nine columns (One for team ID and Name & State for the four players).
Is it possible to accomplish this in Spark? I am a rookie in Spark and so answers that explain the steps would be greatly appreciated.
Upvotes: 1
Views: 7417
Reputation: 3619
It is possible with the SQL, which is not the most efficient way (UDF would be), but it works. And sorry that it is Scala-ish.
val res = spark.sql(
"""select teamID
|, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
|, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
|, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
|from (
| select teamID,
| max(case type when 'Batsman' then info end) as Batsman
| , max(case type when 'Bowler' then info end) as Bowler
| , max(case type when 'Fielder' then info end) as Fielder
| from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
|)""".stripMargin)
I used group by to pivot the data around teamID column and max will select a value which is not null, case statement will allow only one record into max. To simplify the max case combo I used struct function which creates a composite column info made of the payload that we later want to lift into flat schema.
UDF would have been more efficient, but I am not familiar with python.
UPDATE Both solutions (SQL and pivot) use explode and groupBy combo, @Anshuman is much easier to code, that have following execution plans:
SQL
== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(teamID#10, 200)
+- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
PIVOT
== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(TeamID#10, 200)
+- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
Both cause shuffle (Exchange hashpartitioning(TeamID#10, 200)*).
If performance is your goal than you could use this Scala approach (I do not know Python)
import org.apache.spark.sql.functions._
val df_record = spark.read.json(Seq(row_1, row_2).toDS)
//Define your custom player types, as many as needed
val playerTypes = Seq("Batsman", "Bowler", "Fielder")
//Return type for the UDF
val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))
val unpackPlayersUDF = udf( (players: Seq[Row]) => {
val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
val arrangedValues = playerTypes.flatMap { t =>
val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
Seq(
playerRow.map(_.getAs[String]("name"))
, playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
)
}
Row(arrangedValues: _*)
}
, returnType)
val udfRes = df_record
.withColumn("xplayers", unpackPlayersUDF($"payload.players"))
.select("payload.teamID", "xplayers.*")
udfRes.show(false)
udfRes.explain()
Output:
+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1 |Amar |Gujarat |Akbar |Telangana |Antony |Kerala |
|1 |John |Queensland |Smith |Perth |null |null |
+------+------------+-------------+-----------+------------+------------+-------------+
With the following execution plan:
== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]
No shuffle is involved. If you want to increase the performance even further, than added an explicit read schema to spark.read.schem(SCHEMA).json will help further as readers will not have to infer schema, which saves time.
Upvotes: 1
Reputation: 420
We can use pivot function of pyspark
from pyspark.sql.functions import first
df = df_player_dtls.groupBy("TeamID").pivot("Type").agg(
first('Name').alias('Name'),
first("State").alias("State"))
df.show(10,False)
Upvotes: 3