Reputation: 967
I have a file with data structured like this
{'analytics_category_id': 'Default', 'item_sub_type': '', 'deleted_at': '', 'product_category_id': 'Default', 'unit_price': '0.000', 'id': 'myntramprdii201907174a72fb2475d84103844083d1348acb9e', 'is_valid': True, 'measurement_uom_id': '', 'description': '', 'invoice_type': 'receivable_debit_note', 'linked_core_invoice_item_id': '', 'ref_3': '741423', 'ref_2': '6001220139357318', 'ref_1': '2022-07-04', 'tax_rate': '0.000', 'reference_id': '', 'ref_4': '', 'product_id': 'Default', 'total_amount': '0.000', 'tax_auth_party_id': '', 'item_type': 'Product', 'invoice_item_attributes': '', 'core_invoice_id': 'myntramprdi20190717a1e925911345463393bc4ac1b124dbe5', 'tax_auth_geo_id': '', 'quantity': 1}
{'analytics_category_id': 'Default', 'item_sub_type': '', 'deleted_at': '', 'product_category_id': 'Default', 'unit_price': '511.000', 'id': 'myntramprdii20190717c749a96d2e7144aea7fc5125287717f7', 'is_valid': True, 'measurement_uom_id': '', 'description': '', 'invoice_type': 'receivable_debit_note', 'linked_core_invoice_item_id': '', 'ref_3': '741424', 'ref_2': '6001220152640260', 'ref_1': '2022-07-07', 'tax_rate': '0.000', 'reference_id': '', 'ref_4': '', 'product_id': 'Default', 'total_amount': '511.000', 'tax_auth_party_id': '', 'item_type': 'Product', 'invoice_item_attributes': '', 'core_invoice_id': 'myntramprdi20190717a1e925911345463393bc4ac1b124dbe5', 'tax_auth_geo_id': '', 'quantity': 1}
I am trying to parse this in Spark using scala and create a dataframe off of it, but not able to do so because of the structure. I thought about replacing the '
with "
but my text can also have the same. What I need is a key value pair of the data.
So far I have tried:
read.option("multiline", "true").json("s3://******/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt")
I did get some success reading this as a multiline text:
read.option("multiline", "true").textFile("s3://********/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt")
| value|
+--------------------+
|{'analytics_categ...|
|{'analytics_categ...|
+--------------------+
How do I read the keys as columns now?
Upvotes: 2
Views: 470
Reputation: 6917
Your issue is linked to the True
value used as a boolean in your entries: this is not valid in JSON which requires true
or false
as boolean values
If your dataset is not very large, the easiest way is to load as text, fix this issue, write the fixed data then reopen it as json.
import spark.implicits._
import org.apache.spark.sql.types._
val initial = spark.read.text("s3://******/*********/prod_flattener/y=2019/m=07/d=17/type=flattened_core_invoices_items/invoice_items_2019_07_17_23_53_19.txt")
val fixed = initial
.select(regexp_replace('value,"\\bTrue\\b","true") as "value")
.select(regexp_replace('value,"\\bFalse\\b","false") as "value")
fixed.write.mode("overwrite").text("/tmp/fixed_items")
val json_df = spark.read.json("/tmp/fixed_items")
json_df: org.apache.spark.sql.DataFrame = [analytics_category_id: string, core_invoice_id: string ... 23 more fields]
If you don't want to a temporary dataset, you can directly use from_json
to parse the fixed text value but you'll need to manually define your schema in spark beforehand and do some column renaming after parsing:
val jsonSchema = StructType.fromDDL("`analytics_category_id` STRING,`core_invoice_id` STRING,`deleted_at` STRING,`description` STRING,`id` STRING,`invoice_item_attributes` STRING,`invoice_type` STRING,`is_valid` BOOLEAN,`item_sub_type` STRING,`item_type` STRING,`linked_core_invoice_item_id` STRING,`measurement_uom_id` STRING,`product_category_id` STRING,`product_id` STRING,`quantity` BIGINT,`ref_1` STRING,`ref_2` STRING,`ref_3` STRING,`ref_4` STRING,`reference_id` STRING,`tax_auth_geo_id` STRING,`tax_auth_party_id` STRING,`tax_rate` STRING,`total_amount` STRING,`unit_price` STRING")
val jsonParsingOptions: Map[String,String] = Map()
val json_df = fixed
.select(from_json('value, jsonSchema, jsonParsingOptions) as "j")
.select(jsonSchema.map(f => 'j.getItem(f.name).as(f.name)):_*)
json_df: org.apache.spark.sql.DataFrame = [analytics_category_id: string, core_invoice_id: string ... 23 more fields]
As an aside, from the snippet you posted, you don't seem to require the multiline
option but if you actually do you'll need to add the option to the jsonParsingOptions
map.
Upvotes: 1