Bishamon Ten
Bishamon Ten

Reputation: 588

SPARK: How to parse a Array of JSON object using Spark

I have a file with normal columns and a column that contains a Json string which is as below. Also picture attached. Each row actually belongs to a column named Demo(not Visible in pic).The other columns are removed and not visible in pic because they are not of concern for now.

[{"key":"device_kind","value":"desktop"},{"key":"country_code","value":"ID"},{"key":"device_platform","value":"windows"}]

Please do not change the format of the JSON since it is as above in the data file except everything is in one line.

Each row has one such object under column say JSON. The objects are all in one line but in a array.I would like to Parse this column using spark and access he value of each object inside. Please help.

What I want is to get value of key "value". My objective is to extract value of "value" key from each JSON object into separate columns.

I tried using get_json_object. It works for the following 1) Json string but returns null for the JSON 2)

  1. {"key":"device_kind","value":"desktop"}
  2. [{"key":"device_kind","value":"desktop"},{"key":"country_code","value":"ID"},{"key":"device_platform","value":"windows"}]

code I tried is as below

val jsonDF1 = spark.range(1).selectExpr(""" '{"key":"device_kind","value":"desktop"}' as jsonString""")

jsonDF1.select(get_json_object(col("jsonString"), "$.value") as "device_kind").show(2)// prints desktop under column named device_kind

val jsonDF2 = spark.range(1).selectExpr(""" '[{"key":"device_kind","value":"desktop"},{"key":"country_code","value":"ID"},{"key":"device_platform","value":"windows"}]' as jsonString""")

jsonDF2.select(get_json_object(col("jsonString"), "$.[0].value") as "device_kind").show(2)// print null but expected is desktop under column named device_kind

Next I wanted to use from_Json but I am unable to figure out how to build schema for Array of JSON objects. All examples I find are that of nested JSON objects but nothing similar to the above JSON string.

I did find that in sparkR 2.2 from_Json has a boolean parameter if set true it will handle the above type of JSON string i.e Array of JSON objects but that option is not available in Spark-Scala 2.3.3

To be clear on input and expected output it should be as below.

i/p below

+------------------------------------------------------------------------+
|Demographics                                                            |
+------------------------------------------------------------------------+
|[[device_kind, desktop], [country_code, ID], [device_platform, windows]]|
|[[device_kind, mobile], [country_code, BE], [device_platform, android]] |
|[[device_kind, mobile], [country_code, QA], [device_platform, android]] |
+------------------------------------------------------------------------+

Expected o/p below

+------------------------------------------------------------------------+-----------+------------+---------------+
|Demographics                                                            |device_kind|country_code|device_platform|
+------------------------------------------------------------------------+-----------+------------+---------------+
|[[device_kind, desktop], [country_code, ID], [device_platform, windows]]|desktop    |ID          |windows        |
|[[device_kind, mobile], [country_code, BE], [device_platform, android]] |mobile     |BE          |android        |
|[[device_kind, mobile], [country_code, QA], [device_platform, android]] |mobile     |QA          |android        |
+------------------------------------------------------------------------+-----------+------------+---------------+

enter image description here

Upvotes: 4

Views: 11535

Answers (2)

Bishamon Ten
Bishamon Ten

Reputation: 588

Aleh thank you for answer.It works fine. I did the solution in slightly different way because I am using 2.3.3 spark.

val sch = ArrayType(StructType(Array(
  StructField("key", StringType, true),
  StructField("value", StringType, true)
)))

val jsonDF3 = mdf.select(from_json(col("jsonString"), sch).alias("Demographics"))

val jsonDF4 = jsonDF3.withColumn("device_kind", expr("Demographics[0].value"))
  .withColumn("country_code", expr("Demographics[1].value"))
  .withColumn("device_platform", expr("Demographics[2].value"))

Upvotes: 4

Aleh Pranovich
Aleh Pranovich

Reputation: 361

If your column with JSON looks like this

    import spark.implicits._

    val inputDF = Seq(
      ("""[{"key":"device_kind","value":"desktop"},{"key":"country_code","value":"ID"},{"key":"device_platform","value":"windows"}]"""),
      ("""[{"key":"device_kind","value":"mobile"},{"key":"country_code","value":"BE"},{"key":"device_platform","value":"android"}]"""),
      ("""[{"key":"device_kind","value":"mobile"},{"key":"country_code","value":"QA"},{"key":"device_platform","value":"android"}]""")
    ).toDF("Demographics")

  inputDF.show(false)
+-------------------------------------------------------------------------------------------------------------------------+
|Demographics                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------+
|[{"key":"device_kind","value":"desktop"},{"key":"country_code","value":"ID"},{"key":"device_platform","value":"windows"}]|
|[{"key":"device_kind","value":"mobile"},{"key":"country_code","value":"BE"},{"key":"device_platform","value":"android"}] |
|[{"key":"device_kind","value":"mobile"},{"key":"country_code","value":"QA"},{"key":"device_platform","value":"android"}] |
+-------------------------------------------------------------------------------------------------------------------------+

you can try to parse the column in the following way:

  val parsedJson: DataFrame = inputDF.selectExpr("Demographics", "from_json(Demographics, 'array<struct<key:string,value:string>>') as parsed_json")

  val splitted = parsedJson.select(
    col("parsed_json").as("Demographics"),
    col("parsed_json").getItem(0).as("device_kind_json"),
    col("parsed_json").getItem(1).as("country_code_json"),
    col("parsed_json").getItem(2).as("device_platform_json")
  )

  val result = splitted.select(
    col("Demographics"),
    col("device_kind_json.value").as("device_kind"),
    col("country_code_json.value").as("country_code"),
    col("device_platform_json.value").as("device_platform")
  )

  result.show(false)

You will get the output:

+------------------------------------------------------------------------+-----------+------------+---------------+
|Demographics                                                            |device_kind|country_code|device_platform|
+------------------------------------------------------------------------+-----------+------------+---------------+
|[[device_kind, desktop], [country_code, ID], [device_platform, windows]]|desktop    |ID          |windows        |
|[[device_kind, mobile], [country_code, BE], [device_platform, android]] |mobile     |BE          |android        |
|[[device_kind, mobile], [country_code, QA], [device_platform, android]] |mobile     |QA          |android        |
+------------------------------------------------------------------------+-----------+------------+---------------+

Upvotes: 1

Related Questions