Reputation: 1952
I am analysing some JSON data in Palantir Foundry using PySpark. The source is a 30MB uploaded JSON file containing four elements, one of which holds a table of some 60 columns and 20,000 rows. Some of the columns in this table are strings that contain HTML entities representing UTF characters (other columns are numeric or boolean). I want to clean these strings to replace the HTML entities with the corresponding characters.
I realise that I can apply html.unescape(my_str)
in a UDF to the string columns once all the JSON data has been converted into dataframes. However, this sounds inefficient. This answer suggests it would be better to process the whole JSON file in one go, before converting it to a dataframe. However, my current code uses spark_session.read.json()
to go automatically from the raw file to a dataframe with a proper schema. I can't see how to modify it to include the unescape()
stage without ending up with everything as StringType()
, and I don't want to have to manually code the schema for every column of the nested data structure. My current code to read the JSON into a dataframe looks like this:
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from transforms.api import transform, Input, Output, Check
@transform(
parsed_output=Output("out_path"),
raw_file_input=Input("in_path")
)
def read_json(ctx, parsed_output, raw_file_input):
filesystem = raw_file_input.filesystem()
hadoop_path = filesystem.hadoop_path
paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
df = ctx.spark_session.read.json(paths)
parsed_output.write_dataframe(sanitize_schema_for_parquet(df))
How can I adapt this to unescape()
the plain text of the JSON before it is parsed? Or would it actually be likely to be more efficient to unescape
row by row and column by column later, using the parallelism in Spark, than trying to process the data as a single 30MB-long string?
An example of my JSON input format. The real input is about 30MB long and not pretty-printed. The Data
structure has many more rows, and around 60 columns. String columns are mixed in between numeric and boolean columns, in no particular order:
{
"Data": [
{"Id": 1, "Lots": "more", "data": "of", "different": "types", "Flag1": true, "RefNumber": 17},
{"Id": 2, "Lots": "of the string", "data": "includes entities like ≤ in", "different": "places", "Flag1": false, "RefNumber": 17781}
],
"TotalRows":2,
"Warnings":null,
"Errors":null
}
The final expected output from the above would end up as the below (I don't have any problem with processing the JSON into the right columns, it's just efficiently converting the HTML entities to characters that is an issue). Note the math symbol ≤
in the 'data' field in row 2, rather than the entity ≤
:
Id | Lots | data | different | Flag1 | RefNumber
---+-----------------+-------------------------------+-----------+-------+-----------
1 | "more" | "of" | "types" | true | 17
2 | "of the string" | "includes entities like ≤ in" | "places" | false | 17781
Upvotes: 2
Views: 407
Reputation: 24478
An approach using RDDs:
from transforms.api import transform, Input, Output
import html
import json
@transform(
out=Output("out_path"),
inp=Input("in_path"),
)
def compute(ctx, inp, out):
spark = ctx.spark_session
inp_fs = inp.filesystem()
rdd = spark.sparkContext.wholeTextFiles(inp_fs.hadoop_path).values()
rdd = rdd.flatMap(lambda x: json.loads(html.unescape(x))['Data'])
rdd = rdd.map(json.dumps)
df = spark.read.json(rdd)
out.write_dataframe(df)
Upvotes: 1
Reputation: 56
Broadly speaking, the answer that you linked (How to decode HTML entities in Spark?) is the right approach here. In order to achieve what you want (the convenience of letting Spark automagically infer a schema for you), here's how I would implement this in Foundry.
@transform(
output_json=Output("/NAMESPACE/PROJECT/datasets/preprocess/escaped_json"),
input_json=Input("/NAMESPACE/PROJECT/datasets/preprocess/raw_json"),
)
def compute_function(output_json, input_json):
input_fs = input_json.filesystem()
output_fs = output_json.filesystem()
def process_file(file_status):
with input_fs.open(file_status.path) as input_fp:
json_data = json.load(input_fp)
clean_up_json_in_place(json_data)
with output_fs.open(file_status.path, "w") as output_fp:
json.dump(json_data, output_fp)
input_fs.files().foreach(process_file)
/NAMESPACE/PROJECT/datasets/preprocess/escaped_json
Upvotes: 2
Reputation: 1631
You can try adding a withColumn(cnameNew, udf_unescape(columnWithHtml))
. This should help you out.
Your udf could parse the html to dictionary and create a structT from it and return it. This should give you a complexTyped column and aid in further processing.
If you could share a sample of the data then I could extend on this answer.
Upvotes: 0