MikiBelavista
MikiBelavista

Reputation: 2758

How to automate Spark read?

I need to read 150 times from my S3 bucket

df1 = spark.read.json('s3://mybucket/f1')
df2 = spark.read.json('s3://mybucket/f2')
...
df150 = spark.read.json('s3://mybucket/f150')

How to automate this process?

spark.read.json produces Spark Dataframe.

If I try what Oscar suggested

import spark
your_dfs_list = [spark.read.json("s3://cw-mybuc/RECORDS/FULL_RECEIVED/2020/07/01/00"+str(x)) for x in range(1,38)]

AttributeError: module 'spark' has no attribute 'read'

Upvotes: 1

Views: 824

Answers (3)

Lars Skaug
Lars Skaug

Reputation: 1386

Step 1: Create a list of all the json.gz files. In current versions of spark, .gzipped files are read automatically, so that's not an issue. If you're reading all the files in an S3 bucket, you can use the following solution:

from boto.s3.connection import S3Connection

fs = [] # Emtpy list of files

conn = S3Connection('access-key','secret-access-key')
bucket = conn.get_bucket('bucket')
for key in bucket.list():
    fs.append[key.name.encode('utf-8')]

Step 2: Iterate over each of the files from (1) unioning each of the resulting data frames as you move along. A version of Godza's solution should do the trick:

# Read first file
df = spark.read.json(fs[0]) 

# Then union the rest
for f in fs[1:]:
  df = df.union(spark.read.json(f))

Upvotes: 2

godzsa
godzsa

Reputation: 2395

I think you should provide more detail. How often do you want to read, what is the reason for that etc. If you give some context we might be able to help better?

From your code snippet it seems like a loop would be the easiest to do. and read it in an array.

list = []

for i in range(150):
  list.append(spark.read.json('s3://mybucket/f' + (i + 1)))

However if you provide some more detail I am pretty sure this answer can be improved

Edit based on comments

If the case is that you want to use union on DataFrames the easiest would be to import the implicits:

import spark.implicits._

var df = spark.emptyDataFrame

for i in range(150):
  df = df.union(spark.read.json('s3://mybucket/f' + (i + 1))))

Note this should work with spark 2.x and above:

https://sparkbyexamples.com/spark/spark-how-to-create-an-empty-dataframe/ https://sparkbyexamples.com/spark/spark-dataframe-union-and-union-all/

Upvotes: 2

Oscar Lopez M.
Oscar Lopez M.

Reputation: 605

I think you can use a list comprehension that returns a list of dataframes and from there you can iterate on them

your_dfs_list = [spark.read.json("s3://mybucket/f"+str(x)) for x in range(1,150)]

I guess doing that in Scala could be easier to apply a map (or foreach later) on the dataframes, but that depends on your preference :)

(1 to 150).map(v => spark.read.json(s"s3://mybucket/f$v"))

Upvotes: 4

Related Questions