Reputation: 691
I am using Spark 2.1 and Zeppelin 0.7 to do the following. (this is inspired by the Databricks tutorial (https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html))
I have created the following schema
val jsonSchema = new StructType()
.add("Records", ArrayType(new StructType()
.add("Id", IntegerType)
.add("eventDt", StringType)
.add("appId", StringType)
.add("userId", StringType)
.add("eventName", StringType)
.add("eventValues", StringType)
)
)
to read in the following json 'array' file, which i have in my 'inputPath' directory
{
"Records": [{
"Id": 9550,
"eventDt": "1491810477700",
"appId": "dandb01",
"userId": "985580",
"eventName": "OG: HR: SELECT",
"eventValues": "985087"
},
... other records
]}
val rawRecords = spark.read.schema(jsonSchema).json(inputPath)
I then want to explode these records to get to the individual events
val events = rawRecords.select(explode($"Records").as("record"))
But rawRecords.show() and events.show() are both null.
Any idea what i am doing wrong? In the past i know i should be using JSONL for this, but the Databricks tutorial suggests that the latest version of spark should now support json arrays.
Upvotes: 0
Views: 2587
Reputation: 1008
I did the following :
{"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]} {"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]}
I have the following code
import sqlContext.implicits._ import org.apache.spark.sql.functions._
val df = sqlContext.read.json("foo.txt") df.printSchema()
df.select(explode($"Records").as("record")).show
I get the following output
root |-- Records: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Id: long (nullable = true) |
| |-- appId: string (nullable = true) | | |-- eventDt: string (nullable = true) | | |-- eventName: string (nullable = true) | | |-- eventValues: string (nullable = true) | |
|-- userId: string (nullable = true)+--------------------+ | record| +--------------------+ |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| |[9550,dandb01,149...| +--------------------+
Upvotes: 1