bytecode
bytecode

Reputation: 71

Lookup values from a MapType column with keys from another column

I have a Spark streaming dataframe with two columns. An Integer id column and a MapType column with an Integer Id as key and a JSON object as value.

---------------------------------------------------------------------------------------------------------------
id             objects
---------------------------------------------------------------------------------------------------------------
1     |    (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
5     |    (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
2     |    (1 -> {"id" : 1, "type": "jpeg"}, 2 -> {"id" : 2, "type": "gif"}, 3 -> {"id" : 3, "type": "png"})
---------------------------------------------------------------------------------------------------------------

I want to construct a new dataframe that has a single column containing the JSON object whose key matches the id column.

----------------------------------------------------------------------
objects
----------------------------------------------------------------------
{"id" : 1, "type": "jpeg"}
{"id" : 2, "type": "gif" }
----------------------------------------------------------------------

What's the best way to do this? I implemented a Scala udf that does the lookup and returns the corresponding object value, but was wondering if the same can be done using built in Spark functions.

I tried to do something like this:

df.withColumn("obj", $"objects".getItem($"id"))

But it throws an exception:

java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.ColumnName

which makes sense since $"id" is a column type. However, if I do a collect that would result in an action which I hope to avoid.

Upvotes: 3

Views: 1992

Answers (1)

Shaido
Shaido

Reputation: 28322

There is no need for an UDF, you can use the inbuilt functionality to do this. However, you can't use getItem, since the argument is another column, not a string value.

Instead you can get values from the Map as follows:

df.withColumn("value", $"objects"($"id"))

To create a new dataframe and remove the rows where the id does not exist in the Map,

df.select($"objects"($"id").as("objects")).na.drop

which will give you,

+-------------------------+
|objects                  |
+-------------------------+
|{"id": 1, "type": "jpeg"}|
|{"id": 3, "type": "png"} |
+-------------------------+

Upvotes: 2

Related Questions