aditya bhandari
aditya bhandari

Reputation: 113

Reading Unzipped Shapefiles stored in AWS S3 from AWS EMR Cluster using PySpark in Jupyter Notebook

I'm completely new to AWS EMR and apache spark. I'm trying to assign GeoID's to residential properties using shapefiles. I'm not able to read the shapefiles from my s3 bucket. Please help me in understanding what is going on as I couldn't find any answer on the internet that explains the exact problem.

<!-- language: python 3.4 -->

import shapefile
import pandas as pd

def read_shapefile(shp_path):

"""
Read a shapefile into a Pandas dataframe with a 'coords' column holding
the geometry information. This uses the pyshp package
"""
    #read file, parse out the records and shapes
    sf = shapefile.Reader(shp_path)
    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df

read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10")

Files That I want to read

The error that I'm getting while reading from the bucket

I really want to read these shapefiles in AWS EMR cluster, as it's not possible for me to work locally on them individually. Any kind of help is appreciated.

Upvotes: 7

Views: 2488

Answers (1)

aditya bhandari
aditya bhandari

Reputation: 113

I was able to read my shape files from s3 bucket as a binary object in the beginning and then build a wrapper function around it, finally parsed the individual file objects to shapefile.reader() method in .dbf, .shp ,.shx formats separately.

This was happening because PySpark cannot read formats that are not provided in SparkContext. Found this link helpful Using pyshp to read a file-like object from a zipped archive.

My solution

def read_shapefile(shp_path):

    import io
    import shapefile

    blocks = sc.binaryFiles(shp_path)
    block_dict = dict(blocks.collect())

    sf = shapefile.Reader(shp=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shp")][0]]),
                              shx=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".shx")][0]]),
                              dbf=io.BytesIO(block_dict[[i for i in block_dict.keys() if i.endswith(".dbf")][0]]))

    fields = [x[0] for x in sf.fields][1:]
    records = sf.records()
    shps = [s.points for s in sf.shapes()]
    center = [shape(s).centroid.coords[0] for s in sf.shapes()]

    #write into a dataframe
    df = pd.DataFrame(columns=fields, data=records)
    df = df.assign(coords=shps, centroid=center)

    return df
block_shapes = read_shapefile("s3a://uim-raw-datasets/census-bureau/tabblock-2010/tabblock-by-fips/tl_2010_01001_tabblock10*")

This works fine without breaking.

Upvotes: 3

Related Questions