huy
huy

Reputation: 1884

How to write Foundry dataset to XML file in PySpark?

I found that on Foundry, we can use databricks:spark-xml to read a dataset containing one or many XML files.

But I cannot find a way to write a Foundry dataset to a dataset containing XML file (write every row to a single XML file)?

I have followed this answer to parse XML to a dataset and here is my code:

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import union_many
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

BOOK_SCHEMA = StructType([
        StructField("_id", StringType(), True),
        StructField("author", StringType(), True),
        StructField("description", StringType(), True),
        StructField("genre", StringType(), True),
        StructField("price", DoubleType(), True),
        StructField("publish_date", StringType(), True),
        StructField("title", StringType(), True)]
    )


def read_files(spark_session, paths, input_schema):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('xml').options(rowTag="book").load(file_name, schema=input_schema)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    xml_df=Output("/output/book-xml"),
    source_df=Input("/input/example-book"),
)
def compute(ctx, source_df, xml_df):
    session = ctx.spark_session
    input_filesystem = source_df.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files, BOOK_SCHEMA)
    xml_df.write_dataframe(output_df)

Here is the link to the example of the XML file provided by databricks: Link to example.

I found the example of how to save to a dataset but I don't know how to run it in Foundry.

Upvotes: 2

Views: 428

Answers (1)

huy
huy

Reputation: 1884

Following this doc by databricks, I am able to write Dataframe to a single XML file with this code:

@transform(
    output_df=Output("/output/book-dataset2xml"),
    source_df=Input("/output/book-xml"), # Use the dataset from reading the example XML file
)
def compute(ctx, source_df, output_df):
    df = source_df.dataframe()
    input_filesystem = output_df.filesystem()
    hadoop_path = input_filesystem.hadoop_path

    filename = hadoop_path + '/' + 'newbooks.xml'
    df.select("author", "_id").write.format('xml').options(rowTag='book', rootTag='books').save(filename)

The output XML file will look like this:

<books>
    <book id="bk101">
        <author>Gambardella, Matthew</author>
    </book>

    <book id="bk102">
        <author>Ralls, Kim</author>
    </book>
...
</books>

Upvotes: 1

Related Questions