Reputation: 2055
I have zip files stored in Amazon s3 then I have a Python list as ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]
, I need to unzip all these files using a Spark Cluster, and stored all the CSV files into a delta format table. I would like to know a faster processing approach than my current approach:
1) I have a bucle for for iterating in my Python list.
2) I'm obtaining the zip files from s3 using Python Boto3 s3.bucket.Object(file)
3) I'm unzipping the files using the next code
import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)
4) My files are unzipped in the Driver Node, then the executors can't reach these files (I don't find a way to do it) so I move all these csv files to DBFS using dbutils.fs.cp()
5) I read all the csv files from DBFS using a Pyspark Dataframe and I write that into a Delta table
df = self.spark.read.option("header", "true").csv("dbfs:" + file)
df.write.format("delta").save(path)
6) I delete the data from DBFS and the Driver Node
So, my current goal is to ingest zip files from S3 into a Delta table in less time than my previous process. I suppose that I can parallelize some of these processes as the 1) step, I would like to avoid the copy step to DBFS because I don't need to have the data there, also I need to remove the CSV files after each ingests into a Delta Table to avoid a memory error in the Driver Node disk. Any advice?
Upvotes: 6
Views: 25410
Reputation: 1126
Zip as you know by now is not a splittable compression technique, there are no built-in codecs that work with zip. You may find some clever person has written their own Spark zip codec/data source, but I haven't found it yet.
A few tips I picked up recently:
The aws cli will copy files in parallel to local node (driver or worker). It is much faster to run aws cp <src> <dst>
than to stream the binary file using aws cp <src> -
. aws cli / boto
is much faster (but a little bit harder to use) than the dbutils.fs.cp()
Using a for
loop is a good sign of non-parallel execution and inefficiencies. This will lead to lower throughput, higher costs, lower cluster utilization. Use the functional capabilities of Spark and Python where ever possible.
You can create a Spark DataFrame of files names with something like:
df_files = spark.createDataFrame(dbutils.fs.ls("<s3 path to bucket and folder>"))
from this data frame you can run a pandas udf, the .pipe()
operator, a regular UDF.
Running a spark .pipe()
or a Pandas UDF, in parallel, across a set of workers, each working off the path of the file would let you get more throughput, keeping your cluster busy.
Experiment with the PandasUDF to perform either: a. copy-unzip-copy back to s3, read with CSV reader b. copy-unzip-read-return-in-a-pandas-udf. Watch the Ganglia metrics to ensure a high utilization rate.
Since you're writing to a Delta table, you can reliably run your writes (appends) in parallel if you have to have multiple spark jobs. [parallel workers in one job >> parallel spark jobs]
Supplying a schema to your CSV read 2x faster than .option("inferSchema","true")
Upvotes: 1
Reputation: 67
Well, Multiple possible solutions could be:
df=spark.read.csv("s3://mybucket")
and write the dataframe as delta with df.write.format("delta").save(path)
Option 3 would be something like:
import io
import boto3
import shutil
import zipfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("name").getOrCreate()
schema = StructType([
\\ YOUR DATA SCHMEA
])
df = spark.createDataFrame([], schema)
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
tempdf = spark.read.option("header", "true").csv(outputZip)
df = df.union(tempdf)
df.write.format("delta").save(path)
Upvotes: 1