Reputation: 157
I have multiple files in S3.
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket='cw-milenko-tests', Prefix='Json_gzips'):
contents = page["Contents"]
for c in contents:
if (c['Key']).startswith('Json_gzips/tick_calculated_3'):
print(c['Key'])
Output
Json_gzips/tick_calculated_3_2020-05-27T00-05-51.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-13-23.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-17-36.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-28-10.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-30-43.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-34-56.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-38-29.json.gz
I want to real all these files into Spark Data Frame,then perform union and save that as a single parquet file. I already asked define schema for PySpark and got usefull answer. How should i edit my code
data = spark.read.json(c['Key'])
data should be saved to big_data =[data1,data2,...]
That would enable me union.
bigdf = reduce(DataFrame.unionAll, big_data)
How to fix this?
Upvotes: 2
Views: 2935
Reputation: 87259
If you have only this data in the S3 bucket, you can read them directly:
data = spark.read.json('s3:/.../Json_gzips/')
if we want to read only some data, and we have a list of files/URLs, then we can do following:
from functools import reduce
lst=['s3:/....', 's3:/...']
dfs = [spark.read.json(file) for file in lst]
udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
How it works:
reduce
function that simplifies codeunion
operation on all dataframesThere are caveats here that needs to be taken into account:
spark.read.json
, and this may lead to the having dataframes with different schema if some data is missing in some filesBut we can workaround this by providing schema explicitly when reading the data. I often use following "hack" - put one or two samples into a separate file, and read it by Spark, and use the inferred schema for enforcing the schema during reading - it will be also faster, as schema is already provided. For example, how it could be done:
>>> lst = ["file1", "file2", "file3"]
# we're reading the data without schema
>>> dfs = [spark.read.json(file) for file in lst]
# we get different lengths of the schemas
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 27]
# we're getting error when trying to do union
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
we reading with enforcing the schema:
>>> sample_schema = spark.read.json("sample_file.json").schema
>>> dfs = [spark.read.schema(sample_schema).json(file) for file in lst]
# all lengths are the same
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 28]
# and union works
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
Upvotes: 1