Reputation: 71
I have a Spark streaming dataframe with two columns. An Integer id column and a MapType column with an Integer Id as key and a JSON object as value.
---------------------------------------------------------------------------------------------------------------
id objects
---------------------------------------------------------------------------------------------------------------
1 | (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
5 | (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
2 | (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
---------------------------------------------------------------------------------------------------------------
I want to construct a new dataframe that has a single column containing the JSON object whose key matches the id column.
----------------------------------------------------------------------
objects
----------------------------------------------------------------------
{"id" : 1, "type": "jpeg"}
{"id" : 2, "type": "gif" }
----------------------------------------------------------------------
What's the best way to do this? I implemented a Scala udf that does the lookup and returns the corresponding object value, but was wondering if the same can be done using built in Spark functions.
I tried to do something like this:
df.withColumn("obj", $"objects".getItem($"id"))
But it throws an exception:
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.ColumnName
which makes sense since $"id"
is a column type. However, if I do a collect that would result in an action which I hope to avoid.
Upvotes: 3
Views: 1992
Reputation: 28322
There is no need for an UDF, you can use the inbuilt functionality to do this. However, you can't use getItem
, since the argument is another column, not a string value.
Instead you can get values from the Map as follows:
df.withColumn("value", $"objects"($"id"))
To create a new dataframe and remove the rows where the id does not exist in the Map,
df.select($"objects"($"id").as("objects")).na.drop
which will give you,
+-------------------------+
|objects |
+-------------------------+
|{"id": 1, "type": "jpeg"}|
|{"id": 3, "type": "png"} |
+-------------------------+
Upvotes: 2