Reputation: 33
I am processing streaming events of different types and different schema in spark with scala and I need to parse them, and save them in a format that's easy to process in a generic way further.
I have a dataframe of events that looks like this:
val df = Seq(("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"), ("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")).toDF("col1", "col2", "col3")
which is this:
+------------------------------------+--------+------+
| body | type | id |
+------------------------------------+--------+------+
|{"a": 1, "b": 2, "c": 3 } | "One"| 001|
|{"a": 6, "d": 2, "f": 8, "g": 10} | "Two"| 089|
|{"a": 3, "b": 4, "c": 6 } | "Three"| 123|
+------------------------------------+--------+------+
and I would like to turn it into this one. We can assume that all the type "One" will have the same schema, and all types of events will share some similar data such as the entry "a", which i would like to surface into its own column
+---+--------------------------------+--------+------+
| a | data | y | z |
+---+--------------------------------+--------+------+
| 1 |{"b": 2, "c": 3 } | "One"| 001|
| 6 |{"d": 2, "f": 8, "g": 10} | "Two"| 089|
| 3 |{"b": 4, "c": 6 } | "Three"| 123|
+------------------------------------+--------+------+
Upvotes: 0
Views: 3103
Reputation: 7316
One way to achieve that is to handle the json data as a Map as shown below:
import org.apache.spark.sql.types.{MapType, StringType, IntegerType}
import org.apache.spark.sql.functions.{from_json, expr}
val df = Seq(
("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,
("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"),
("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")
).toDF("body", "type", "id")
val mapSchema = MapType(StringType, IntegerType)
df.withColumn("map", from_json($"body", mapSchema))
.withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')"))
.withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))"))
.withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))"))
.withColumn("a", $"map".getItem("a"))
.select($"a", $"data", $"type".as("y"), $"id".as("z"))
.show(false)
// +---+-------------------+---+---+
// |a |data |y |z |
// +---+-------------------+---+---+
// |1 |{"b":2,"c":3} |One|001|
// |6 |{"b":2,"d":2,"f":8}|Two|089|
// |3 |{"b":4,"c":6} |One|123|
// +---+-------------------+---+---+
withColumn("map", from_json($"body", mapSchema))
: first generate a Map from the given json data.withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')"))
: retrieve the keys of the new map by filtering out the keys not equal to a
. We use the filter function here which returns an array i.e {"a": 1, "b": 2, "c": 3 } -> [b, c]
.withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))"))
: populate the values of the new map using the previous keys in combination with transform.withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))"))
: generate the map from data_keys
and data_values
using map_from_arrays. Finally, call to_json
for converting the map back to json.Upvotes: 2
Reputation: 7928
First you need to define the json schema as follows:
val schema = spark.read.json(df.select("col1").as[String]).schema
Then you can transform your column col1
to json (1st line) and then just select which selements of the json you want to extract (2nd line):
df.select(from_json($"col1", schema).as("data"), $"col2", $"col3")
.select($"data.a", $"data", $"col2", $"col3")
Output:
+---+-------------+----+----+
| a| data|col2|col3|
+---+-------------+----+----+
| 1| [1, 2, 3,,]| One| 001|
| 6|[6, 2,, 2, 8]| Two| 089|
| 3| [3, 4, 6,,]| One| 123|
+---+-------------+----+----+
I know it's not exactly the same as you want, but it will give you a clue.
Other option if you want to deconstruct completely your json you can use data.*
df.select(from_json($"col1", schema).as("data"), $"col2", $"col3").select($"data.*", $"col2", $"col3")
+---+---+----+----+----+----+----+
| a| b| c| d| f|col2|col3|
+---+---+----+----+----+----+----+
| 1| 2| 3|null|null| One| 001|
| 6| 2|null| 2| 8| Two| 089|
| 3| 4| 6|null|null| One| 123|
+---+---+----+----+----+----+----+
Upvotes: 0