lseactuary
lseactuary

Reputation: 45

Reading Multiple S3 Folders / Paths Into PySpark

I am conducting a big data analysis using PySpark. I am able to import all CSV files, stored in a particular folder of a particular bucket, using the following command:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/path/datafolder/data2014/*.csv')

(where * acts like a wildcard)

The issues I have are the following:

  1. What if I want to do my analysis on 2014 and 2015 data i.e. file 1 is .load('file:///home/path/SFweather/data2014/*.csv'), file 2 is .load('file:///home/path/SFweather/data2015/*.csv') and file 3 is .load('file:///home/path/NYCweather/data2014/*.csv') and file 4 is .load('file:///home/path/NYCweather/data2015/*.csv'). How do I import multiple paths at the same time to get one dataframe? Do I need to store them all individually as dataframes and then join them together within PySpark? (You may assume they all CSVs have the same schema)
  2. Suppose it is November 2014 now. What if I want to run the analysis again, but on the "most recent data" run e.g. dec14 when it is December 2014? For example, I want to load in file 2: .load('file:///home/path/datafolder/data2014/dec14/*.csv') in December 14 and use this file: .load('file:///home/path/datafolder/data2014/nov14/*.csv') for the original analysis. Is there a way to schedule the Jupyter notebook (or similar) to update the load path and import the latest run (in this case 'nov14' would be replaced by 'dec14' and then 'jan15' etc).

I had a look through the previous questions but was unable to find an answer given this is AWS / PySpark integration specific.

Thank you in advance for the help!

[Background: I have been given access to many S3 buckets from various teams containing various big data sets. Copying it over to my S3 bucket, then building a Jupyter notebook seems like a lot more work than just pulling in the data directly from their bucket and building a model / table / etc ontop of it and saving the processed output into a database. Hence I am posting the questions above. If my thinking is completely wrong, please stop me! :)]

Upvotes: 3

Views: 13321

Answers (2)

aparkerlue
aparkerlue

Reputation: 1895

You can load multiple paths at once using lists of pattern strings. The pyspark.sql.DataFrameReader.load method accepts a list of path strings, which is especially helpful if you can't express all of the paths you want to load using a single Hadoop glob pattern:

?
    Matches any single character.

*
    Matches zero or more characters.

[abc]
    Matches a single character from character set {a,b,c}.

[a-b]
    Matches a single character from the character range {a...b}.
    Note that character a must be lexicographically less than or
    equal to character b.

[^a]
    Matches a single character that is not from character set or
    range {a}.  Note that the ^ character must occur immediately
    to the right of the opening bracket.

\c
    Removes (escapes) any special meaning of character c.

{ab,cd}
    Matches a string from the string set {ab, cd}

{ab,c{de,fh}}
    Matches a string from the string set {ab, cde, cfh}

For example, if you want to load the following paths:

[
    's3a://bucket/prefix/key=1/year=2010/*.csv',
    's3a://bucket/prefix/key=1/year=2011/*.csv',
    's3a://bucket/prefix/key=2/year=2020/*.csv',
    's3a://bucket/prefix/key=2/year=2021/*.csv',
]

You could reduce these to two path patterns,

  • s3a://bucket/prefix/key=1/year=201[0-1]/*.csv and
  • s3a://bucket/prefix/key=2/year=202[0-1]/*.csv,

and call load() twice. You could go further and reduce these to a single pattern string using {ab,cd} alternation, but I think the most readable way to express paths like these using glob patterns with a single call to load() is to pass a list of path patterns:

spark.read.format('csv').load(
    [
        's3a://bucket/prefix/key=1/year=201[0-1]/*.csv',
        's3a://bucket/prefix/key=2/year=202[0-1]/*.csv',
    ]
)

For the paths you listed in your issue № 1, you can express all four with a single pattern string:

'file:///home/path/{NY,SF}weather/data201[45]/*.csv'

For your issue № 2, you can write logic to construct the paths you want to load.

Upvotes: 1

Bob Swain
Bob Swain

Reputation: 3182

You can read in multiple paths with wildcards as long as the files are all in the same format.

In your example:

.load('file:///home/path/SFweather/data2014/*.csv')
.load('file:///home/path/SFweather/data2015/*.csv')
.load('file:///home/path/NYCweather/data2014/*.csv')
.load('file:///home/path/NYCweather/data2015/*.csv')

You could replace the 4 load statements above with the following path to read all csv's in at once to one dataframe:

.load('file:///home/path/*/*/*.csv')

If you want to be more specific in order to avoid reading in certain files/folders, you can do the following:

.load('file:///home/path/[SF|NYC]weather/data201[4|5]/*.csv')

Upvotes: 8

Related Questions