Djikii
Djikii

Reputation: 157

How to read Json Gzip files from S3 into List?

I have multiple files in S3.

s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket='cw-milenko-tests', Prefix='Json_gzips'):
    contents = page["Contents"]
    for c in contents:
        if (c['Key']).startswith('Json_gzips/tick_calculated_3'):
            print(c['Key'])

Output

Json_gzips/tick_calculated_3_2020-05-27T00-05-51.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-13-23.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-17-36.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-28-10.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-30-43.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-34-56.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-38-29.json.gz

I want to real all these files into Spark Data Frame,then perform union and save that as a single parquet file. I already asked define schema for PySpark and got usefull answer. How should i edit my code

data = spark.read.json(c['Key'])

data should be saved to big_data =[data1,data2,...]

That would enable me union.

bigdf = reduce(DataFrame.unionAll, big_data)

How to fix this?

Upvotes: 2

Views: 2935

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

If you have only this data in the S3 bucket, you can read them directly:

data = spark.read.json('s3:/.../Json_gzips/')

if we want to read only some data, and we have a list of files/URLs, then we can do following:

from functools import reduce
lst=['s3:/....', 's3:/...']
dfs = [spark.read.json(file) for file in lst]
udfs = reduce(lambda df1, df2: df1.union(df2), dfs)

How it works:

  • first line imports reduce function that simplifies code
  • second line defines a list of resources
  • third - generates a dataframe for every item in list
  • last one - perform pairwise union operation on all dataframes

There are caveats here that needs to be taken into account:

  • this specific code relies on the schema auto-detection done by spark.read.json, and this may lead to the having dataframes with different schema if some data is missing in some files
  • schema for all dataframes participating in union should be the same, otherwise you'll get error

But we can workaround this by providing schema explicitly when reading the data. I often use following "hack" - put one or two samples into a separate file, and read it by Spark, and use the inferred schema for enforcing the schema during reading - it will be also faster, as schema is already provided. For example, how it could be done:

>>> lst = ["file1", "file2", "file3"]
# we're reading the data without schema
>>> dfs = [spark.read.json(file) for file in lst]
# we get different lengths of the schemas
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 27]
# we're getting error when trying to do union
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)

we reading with enforcing the schema:

>>> sample_schema = spark.read.json("sample_file.json").schema
>>> dfs = [spark.read.schema(sample_schema).json(file) for file in lst]
# all lengths are the same
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 28]
# and union works
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)

Upvotes: 1

Related Questions