Reputation: 3109
I am using below pyspark code to read thousands of JSON files from an s3 bucket
sc = SparkContext()
sqlContext = SQLContext(sc)
sqlContext.read.json("s3://bucknet_name/*/*/*.json")
This takes a lot of time to read and parse JSON files(~16 mins). How can I parallelize or speed up the process?
Upvotes: 2
Views: 3620
Reputation: 695
The short answer is : It depends (on the underlying infrastructure) and the distribution within data (called the skew which only applies when you're performing anything that causes a shuffle).
If the code you posted is being run on say: AWS' EMR or MapR, it's best to optimize the number of executors on each cluster node such that the number of cores per executor is from three to five. This number is important from the point of reading and writing to S3.
Another possible reason, behind the slowness, can be the dreaded corporate proxy. If all your requests to the S3 service are being routed via a corporate proxy, then the latter is going to be huge bottleneck. It's best to bypass proxy via the NO_PROXY
JVM argument on the EMR cluster to the S3 service.
This talk from Cloudera alongside their excellent blogs one and two is an excellent introduction to tuning the cluster. Since we're using sql.read.json
the underlying Dataframe
will be split into number of partitions given by the yarn param sql.shuffle.paritions
described here. It's best to set it at 2 * Number of Executors * Cores per Executor
. That will definitely speed up reading, on a cluster whose calculated value exceeds 200
Also, as mentioned in the above answer, if you know the schema of the json, it may speed things up when inferSchema
is set to true
.
I would also implore you to look at the Spark UI and dig into the DAG for slow jobs. It's an invaluable tool for performance tuning on Spark.
I am planning on consolidating as many infrastructure optimizations on AWS' EMR into a blog. Will update the answer with the link once done.
Upvotes: 4
Reputation: 21
There are at least two ways to speed up this process:
schema
argument to avoid schema inference.Upvotes: 2