Eric Bellet
Eric Bellet

Reputation: 2055

PySpark unzip files: Which is a good approach for unzipping files and storing the csv files into a Delta Table?

I have zip files stored in Amazon s3 then I have a Python list as ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"], I need to unzip all these files using a Spark Cluster, and stored all the CSV files into a delta format table. I would like to know a faster processing approach than my current approach:

1) I have a bucle for for iterating in my Python list.

2) I'm obtaining the zip files from s3 using Python Boto3 s3.bucket.Object(file)

3) I'm unzipping the files using the next code

import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
    obj = s3.bucket.Object(file)
    with io.BytesIO(obj.get()["Body"].read()) as tf:
        tf.seek(0)
        with zipfile.ZipFile(tf, mode='r') as zipf:
            for subfile in zipf.namelist():
                zipf.extract(subfile, outputZip)
    dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
    shutil.rmtree(outputZip)
    dbutils.fs.rm("dbfs:" + outputZip, True)

4) My files are unzipped in the Driver Node, then the executors can't reach these files (I don't find a way to do it) so I move all these csv files to DBFS using dbutils.fs.cp()

5) I read all the csv files from DBFS using a Pyspark Dataframe and I write that into a Delta table

df = self.spark.read.option("header", "true").csv("dbfs:" + file) 
df.write.format("delta").save(path)

6) I delete the data from DBFS and the Driver Node

So, my current goal is to ingest zip files from S3 into a Delta table in less time than my previous process. I suppose that I can parallelize some of these processes as the 1) step, I would like to avoid the copy step to DBFS because I don't need to have the data there, also I need to remove the CSV files after each ingests into a Delta Table to avoid a memory error in the Driver Node disk. Any advice?

Upvotes: 6

Views: 25410

Answers (2)

Douglas M
Douglas M

Reputation: 1126

Zip as you know by now is not a splittable compression technique, there are no built-in codecs that work with zip. You may find some clever person has written their own Spark zip codec/data source, but I haven't found it yet.

A few tips I picked up recently:

  1. The aws cli will copy files in parallel to local node (driver or worker). It is much faster to run aws cp <src> <dst> than to stream the binary file using aws cp <src> -. aws cli / boto is much faster (but a little bit harder to use) than the dbutils.fs.cp()

  2. Using a for loop is a good sign of non-parallel execution and inefficiencies. This will lead to lower throughput, higher costs, lower cluster utilization. Use the functional capabilities of Spark and Python where ever possible.

  3. You can create a Spark DataFrame of files names with something like:

df_files = spark.createDataFrame(dbutils.fs.ls("<s3 path to bucket and folder>"))

from this data frame you can run a pandas udf, the .pipe() operator, a regular UDF.

  1. Running a spark .pipe() or a Pandas UDF, in parallel, across a set of workers, each working off the path of the file would let you get more throughput, keeping your cluster busy.

  2. Experiment with the PandasUDF to perform either: a. copy-unzip-copy back to s3, read with CSV reader b. copy-unzip-read-return-in-a-pandas-udf. Watch the Ganglia metrics to ensure a high utilization rate.

  3. Since you're writing to a Delta table, you can reliably run your writes (appends) in parallel if you have to have multiple spark jobs. [parallel workers in one job >> parallel spark jobs]

  4. Supplying a schema to your CSV read 2x faster than .option("inferSchema","true")

Upvotes: 1

Dipesh Patel
Dipesh Patel

Reputation: 67

Well, Multiple possible solutions could be:

  1. You can read all the files together (if schema allows it) with df=spark.read.csv("s3://mybucket") and write the dataframe as delta with df.write.format("delta").save(path)
  2. You can read each file individually in dataframe and append to existing delta table (even if it is empty) directly without storing it in DBFS. For more details: https://docs.databricks.com/delta/delta-batch.html#append-using-dataframes
  3. You can read each file individually in dataframe and union it into existing main dataframe. In the end, You can write main dataframe as delta table.

Option 3 would be something like:

    import io
    import boto3
    import shutil
    import zipfile
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("name").getOrCreate()

    schema = StructType([
    \\ YOUR DATA SCHMEA
    ])

    df = spark.createDataFrame([], schema)

    for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
        obj = s3.bucket.Object(file)
        with io.BytesIO(obj.get()["Body"].read()) as tf:
            tf.seek(0)
            with zipfile.ZipFile(tf, mode='r') as zipf:
                for subfile in zipf.namelist():
                    zipf.extract(subfile, outputZip)
        tempdf = spark.read.option("header", "true").csv(outputZip)
        df = df.union(tempdf)      

    df.write.format("delta").save(path)

Upvotes: 1

Related Questions