Reputation: 2758
I need to read 150 times from my S3 bucket
df1 = spark.read.json('s3://mybucket/f1')
df2 = spark.read.json('s3://mybucket/f2')
...
df150 = spark.read.json('s3://mybucket/f150')
How to automate this process?
spark.read.json produces Spark Dataframe.
If I try what Oscar suggested
import spark
your_dfs_list = [spark.read.json("s3://cw-mybuc/RECORDS/FULL_RECEIVED/2020/07/01/00"+str(x)) for x in range(1,38)]
AttributeError: module 'spark' has no attribute 'read'
Upvotes: 1
Views: 824
Reputation: 1386
Step 1: Create a list of all the json.gz
files. In current versions of spark, .gzipped files are read automatically, so that's not an issue. If you're reading all the files in an S3 bucket, you can use the following solution:
from boto.s3.connection import S3Connection
fs = [] # Emtpy list of files
conn = S3Connection('access-key','secret-access-key')
bucket = conn.get_bucket('bucket')
for key in bucket.list():
fs.append[key.name.encode('utf-8')]
Step 2: Iterate over each of the files from (1) unioning each of the resulting data frames as you move along. A version of Godza's solution should do the trick:
# Read first file
df = spark.read.json(fs[0])
# Then union the rest
for f in fs[1:]:
df = df.union(spark.read.json(f))
Upvotes: 2
Reputation: 2395
I think you should provide more detail. How often do you want to read, what is the reason for that etc. If you give some context we might be able to help better?
From your code snippet it seems like a loop would be the easiest to do. and read it in an array.
list = []
for i in range(150):
list.append(spark.read.json('s3://mybucket/f' + (i + 1)))
However if you provide some more detail I am pretty sure this answer can be improved
Edit based on comments
If the case is that you want to use union on DataFrames the easiest would be to import the implicits:
import spark.implicits._
var df = spark.emptyDataFrame
for i in range(150):
df = df.union(spark.read.json('s3://mybucket/f' + (i + 1))))
Note this should work with spark 2.x and above:
https://sparkbyexamples.com/spark/spark-how-to-create-an-empty-dataframe/ https://sparkbyexamples.com/spark/spark-dataframe-union-and-union-all/
Upvotes: 2
Reputation: 605
I think you can use a list comprehension that returns a list of dataframes and from there you can iterate on them
your_dfs_list = [spark.read.json("s3://mybucket/f"+str(x)) for x in range(1,150)]
I guess doing that in Scala could be easier to apply a map (or foreach later) on the dataframes, but that depends on your preference :)
(1 to 150).map(v => spark.read.json(s"s3://mybucket/f$v"))
Upvotes: 4