Reputation: 1884
I found that on Foundry, we can use databricks:spark-xml
to read a dataset containing one or many XML files.
But I cannot find a way to write a Foundry dataset to a dataset containing XML file (write every row to a single XML file)?
I have followed this answer to parse XML to a dataset and here is my code:
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import union_many
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
BOOK_SCHEMA = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)]
)
def read_files(spark_session, paths, input_schema):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('xml').options(rowTag="book").load(file_name, schema=input_schema)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
xml_df=Output("/output/book-xml"),
source_df=Input("/input/example-book"),
)
def compute(ctx, source_df, xml_df):
session = ctx.spark_session
input_filesystem = source_df.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files, BOOK_SCHEMA)
xml_df.write_dataframe(output_df)
Here is the link to the example of the XML file provided by databricks: Link to example.
I found the example of how to save to a dataset but I don't know how to run it in Foundry.
Upvotes: 2
Views: 428
Reputation: 1884
Following this doc by databricks, I am able to write Dataframe to a single XML file with this code:
@transform(
output_df=Output("/output/book-dataset2xml"),
source_df=Input("/output/book-xml"), # Use the dataset from reading the example XML file
)
def compute(ctx, source_df, output_df):
df = source_df.dataframe()
input_filesystem = output_df.filesystem()
hadoop_path = input_filesystem.hadoop_path
filename = hadoop_path + '/' + 'newbooks.xml'
df.select("author", "_id").write.format('xml').options(rowTag='book', rootTag='books').save(filename)
The output XML file will look like this:
<books>
<book id="bk101">
<author>Gambardella, Matthew</author>
</book>
<book id="bk102">
<author>Ralls, Kim</author>
</book>
...
</books>
Upvotes: 1