Seema Mudgil
Seema Mudgil

Reputation: 385

Retrieving data from s3 bucket in pyspark

I am reading data from s3 bucket in pyspark . I need to parallelize read operation and doing some transformation on the data. But its throwing error. Below is the code.

s3 = boto3.resource('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key)
bucket = s3.Bucket(bucket)

prefix = 'clickEvent-2017-10-09'
files = bucket.objects.filter(Prefix = prefix)
keys=[k.key for k in files]
pkeys = sc.parallelize(keys)

I have a global variable d which is an empty list. And I am appending deviceId data into this.

applying flatMap on the keys

pkeys.flatMap(map_func)

This the function

 def map_func(key):
   print "in map func"
   for line in key.get_contents_as_string().splitlines():
    # parse one line of json
     content = json.loads(line)
     d.append(content['deviceID'])

But the above code gives me error. Can anyone help!

Upvotes: 1

Views: 6037

Answers (1)

Ryan Widmaier
Ryan Widmaier

Reputation: 8523

You have two issues that I can see. The first is you are trying to manually read data from S3 using boto instead of using the direct S3 support built into spark and hadoop. It looks like you are trying to read text files containing json records per line. If that is case, you can just do this in spark:

df = spark.read.json('s3://my-bucket/path/to/json/files/')

This will create a spark DataFrame for you by reading in the JSON data with each line as a row. DataFrames require a rigid pre-defined schema (like a relational database table) which spark try to determine will determine by sampling some of your JSON data. After you have the DataFrame all you need to do to get your column is select it like this:

df.select('deviceID')

The other issue worth pointing out is you are attempting to use a global variable to store data computed across your spark cluster. It is possible to send data from your driver to all of the executors running on spark workers using either broadcast variables or implicit closures. But there is no way in spark to write to a variable in your driver from an executor! To transfer data from executors back to the driver you need to use spark's Action methods intended for exactly this purpose.

Actions are methods that tell spark you want a result computed so it needs to go execute the transformations you have told it about. In your case you would probably either want to:

If the results are large: use DataFrame.write to save the results of your tranformations back to S3

If the results are small: DataFrame.collect() to download them back to your driver and do something with them

Upvotes: 1

Related Questions