JRI
JRI

Reputation: 1952

Efficiently clean HTML entities from a complex JSON file in PySpark

I am analysing some JSON data in Palantir Foundry using PySpark. The source is a 30MB uploaded JSON file containing four elements, one of which holds a table of some 60 columns and 20,000 rows. Some of the columns in this table are strings that contain HTML entities representing UTF characters (other columns are numeric or boolean). I want to clean these strings to replace the HTML entities with the corresponding characters.

I realise that I can apply html.unescape(my_str) in a UDF to the string columns once all the JSON data has been converted into dataframes. However, this sounds inefficient. This answer suggests it would be better to process the whole JSON file in one go, before converting it to a dataframe. However, my current code uses spark_session.read.json() to go automatically from the raw file to a dataframe with a proper schema. I can't see how to modify it to include the unescape() stage without ending up with everything as StringType(), and I don't want to have to manually code the schema for every column of the nested data structure. My current code to read the JSON into a dataframe looks like this:

from transforms.verbs.dataframes import sanitize_schema_for_parquet
from transforms.api import transform, Input, Output, Check

@transform(
    parsed_output=Output("out_path"),
    raw_file_input=Input("in_path")
)
def read_json(ctx, parsed_output, raw_file_input):
    filesystem = raw_file_input.filesystem()
    hadoop_path = filesystem.hadoop_path
    paths = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]

    df = ctx.spark_session.read.json(paths)

    parsed_output.write_dataframe(sanitize_schema_for_parquet(df))

How can I adapt this to unescape() the plain text of the JSON before it is parsed? Or would it actually be likely to be more efficient to unescape row by row and column by column later, using the parallelism in Spark, than trying to process the data as a single 30MB-long string?

An example of my JSON input format. The real input is about 30MB long and not pretty-printed. The Data structure has many more rows, and around 60 columns. String columns are mixed in between numeric and boolean columns, in no particular order:

{
    "Data": [
        {"Id": 1, "Lots": "more", "data": "of", "different": "types", "Flag1": true, "RefNumber": 17},
        {"Id": 2, "Lots": "of the string", "data": "includes entities like ≤ in", "different": "places", "Flag1": false, "RefNumber": 17781}
    ],
    "TotalRows":2,
    "Warnings":null,
    "Errors":null
}

The final expected output from the above would end up as the below (I don't have any problem with processing the JSON into the right columns, it's just efficiently converting the HTML entities to characters that is an issue). Note the math symbol in the 'data' field in row 2, rather than the entity ≤:

Id | Lots            | data                          | different | Flag1 | RefNumber
---+-----------------+-------------------------------+-----------+-------+-----------
 1 | "more"          | "of"                          | "types"   | true  |        17
 2 | "of the string" | "includes entities like ≤ in" | "places"  | false |     17781

Upvotes: 2

Views: 407

Answers (3)

ZygD
ZygD

Reputation: 24478

An approach using RDDs:

from transforms.api import transform, Input, Output
import html
import json


@transform(
    out=Output("out_path"),
    inp=Input("in_path"),
)
def compute(ctx, inp, out):
    spark = ctx.spark_session
    inp_fs = inp.filesystem()

    rdd = spark.sparkContext.wholeTextFiles(inp_fs.hadoop_path).values()
    rdd = rdd.flatMap(lambda x: json.loads(html.unescape(x))['Data'])
    rdd = rdd.map(json.dumps)
    df = spark.read.json(rdd)

    out.write_dataframe(df)

enter image description here

Upvotes: 1

Aaron Rubin
Aaron Rubin

Reputation: 56

Broadly speaking, the answer that you linked (How to decode HTML entities in Spark?) is the right approach here. In order to achieve what you want (the convenience of letting Spark automagically infer a schema for you), here's how I would implement this in Foundry.

  1. As a first step, do something like this (note: I didn't actually run this code, so there might be a trivial mistake or two, but hopefully it illustrates the idea)
@transform(
    output_json=Output("/NAMESPACE/PROJECT/datasets/preprocess/escaped_json"),
    input_json=Input("/NAMESPACE/PROJECT/datasets/preprocess/raw_json"),
)
def compute_function(output_json, input_json):
    input_fs = input_json.filesystem()
    output_fs = output_json.filesystem()

    def process_file(file_status):
        with input_fs.open(file_status.path) as input_fp:
            json_data = json.load(input_fp)
            clean_up_json_in_place(json_data)
            with output_fs.open(file_status.path, "w") as output_fp:
                json.dump(json_data, output_fp)

    input_fs.files().foreach(process_file)
  1. As a second step, do exactly what you're currently doing now, but make the input /NAMESPACE/PROJECT/datasets/preprocess/escaped_json

Upvotes: 2

Yayati Sule
Yayati Sule

Reputation: 1631

You can try adding a withColumn(cnameNew, udf_unescape(columnWithHtml)). This should help you out. Your udf could parse the html to dictionary and create a structT from it and return it. This should give you a complexTyped column and aid in further processing. If you could share a sample of the data then I could extend on this answer.

Upvotes: 0

Related Questions