Reputation: 2808
I'm starting with a large zip file of csvs, which I unzipped in Palantir Foundry.
I now have a dataset which consists of multiple csvs (one for each year), where the csvs are almost the same schema but have some differences. How do I apply a schema to each of the csvs individually or normalize the schema between them?
Upvotes: 0
Views: 875
Reputation: 1747
If your files are unzipped and simply sitting as .csv
s inside your dataset, you can use Spark's native spark_session.read.csv
method similar to my answer over here.
This will look like the following:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('csv').load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
Note that the union_many
verb will stack your schemas on top of each other, so if you have many many files with different schemas, many rows will be null since they will only exist in one file.
If you knew the common fields for each schema, and knew that only one column would change names between files, you could change the logic to rename columns in parsed_df
to harmonize the schemas. It'll depend how much you want to enforce requirements on your schemas.
I would also include a testing method same as the other response so that you can quickly verify the correct parsing behavior.
Upvotes: 0