Dutta
Dutta

Reputation: 683

Read Json File using Apache Spark

I am using Hivecontext to read a json file using following code:

df = hive_context.read.json("/Users/duttaam/Downloads/test.json")
df.registerTempTable("df");

By default spark determined the following schema

root
 |-- id: string (nullable = true)
 |-- profiles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- app_id: string (nullable = true)
 |    |    |-- localytics: struct (nullable = true)
 |    |    |    |-- attributes: struct (nullable = true)
 |    |    |    |    |-- ap: long (nullable = true)
 |    |    |    |    |-- app_version: string (nullable = true)
 |    |    |    |    |-- birthdate: string (nullable = true)
 |    |    |    |    |-- country: string (nullable = true)
 |    |    |    |    |-- device_timezone: string (nullable = true)
 |    |    |    |    |-- language: string (nullable = true)
 |    |    |    |    |-- last_session_date: string (nullable = true)
 |    |    |    |    |-- library_version: string (nullable = true)
 |    |    |    |    |-- os_version: string (nullable = true)
 |    |    |    |    |-- push_enabled: long (nullable = true)
 |    |    |    |    |-- total_sessions: long (nullable = true)
 |    |    |    |    |-- user_type: string (nullable = true) 

My Json looks like as follows

{
  "id": "dsdasdasdsd",
  "profiles": [
    {
      "attributes": {
        "MDN": "eoe/W/5Ru1KAPDMQQ/wq\n/pu/tGRWpA=="
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "device_timezone": "-04:00",
          "country": "us",
          "language": "en",
          "user_type": "known",
          "city_name": "Indianapolis"
        }
      }
    },
    {
      "app_id": "sdas-c824fcf6-bbae-11e5-adasda-asasqwvz",
      "attributes": {
        "Automatic Backup User": "No"
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "os_version": "6.2.1",
          "app_version": "16.2.19.1",
          "library_version": "androida_3.7.0",
          "ap": 1,
          "custom_1": "Unknown (Not Logged In)",
          "total_sessions": 4,
          "birthdate": "2016-07-09",
          "push_enabled": 1,
          "user_type": "known",
          "custom_0": "Unknown (Not Logged In)",
          "seconds_since_last_session": 1457
        }
      }
    }
  ]
}

So by default Spark is not capturing attributes fields in both the profiles. Is there a way we can custom code ad change the schema structure?

Thanks in advance.

Regards, Amit

Upvotes: 0

Views: 1986

Answers (2)

Mia Hunsicker
Mia Hunsicker

Reputation: 51

You can try using hivecontxt.jsonFile(infile):

from pyspark import SparkContext
from pyspark.sql import HiveContext
import json

sc = SparkContext()
hive_contxt = HiveContext(sc)

your_schema = hive_contxt.jsonFile(INFILE)
your_schema.registerTempTable('YOUR TITLE')

You can also query using hive_context.sql(YOUR QUERY).collect()

You can also try dumping your json into memory then using hive_context.jsonRDD(json_dumped_object)

def make_json_single_row(row, field_names):
    row_lst = row.split(';')
    return json.dumps(dict(zip(field_names, row_lst)))

def make_json(rdd, field_names):
    return rdd.map(lambda row: make_json_single_row(row, field_names)

field_names = ['column1','column2','column3']
rdd = sc.textfile(infile)
split_rdd = make_json(rdd, field_names)
your_new_schema = hive_contxt.jsonRDD(split_rdd)

Upvotes: 1

Yuan JI
Yuan JI

Reputation: 2995

If you only want your profiles column
in your case, you could do this (but i'm sure not the best way):

Java:

import org.apache.spark.sql.functions;

DataFrame prof = df.select(functions.explode(df.col("profiles")).as("prof"));
prof.select("prof.app_id", "prof.attributes.*", "prof.localytics.attributes.*");

That requires you knowing well your json schema as a condition

Upvotes: 0

Related Questions