Reputation: 5304
Just a few simple questions on the actual mechanism behind reading a file on s3 into an EMR cluster with Spark:
Does
spark.read.format("com.databricks.spark.csv").load("s3://my/dataset/").where($"state" === "WA")
communicate the whole dataset into the EMR cluster's local HDFS and then perform the filter after? Or does it filter records when bringing the dataset into the cluster? Or does it do neither? If this is the case, what's actually happening?
The official documentation lacks an explanation of what's going on (or if it does have an explanation, I cannot find it). Can someone explain, or link to a resource with such an explanation?
Upvotes: 1
Views: 3359
Reputation: 13480
I can't speak for the closed source AWS one, but the ASF s3a: connector does its work in S3AInputStream
Reading data is via HTTPS, which has slow startup time, and if you need to stop the download before the GET is finished, forces you to abort the TCP stream and create a new one.
To keep this cost down the code has features like
Lazy seek: when you do a seek(), it updates its internal pointer but doesn't issue a new GET until you actually do a read.
chooses whether to abort() vs read to end on a GET based on how much is left
"sequential", GET content range is from (pos, EOF). Best bandwidth, worst performance on seek. For: CSV, .gz, ...
"random": small GETs, min(block-size, length(read)). Best for columnar data (ORC, Parquet) compressed in a seekable format (snappy)
"adaptive" (new last week, based on some work from microsoft on the Azure WASB connector). Starts off sequential, as soon as you do a backwards seek switches to random IO
Code is all there, improvements welcome. The current perf work (especially random IO) based on TPC-DS benchmarking of ORC data on Hive, BTW)
Assuming you are reading CSV and filtering there, it'll be reading the entire CSV file and filtering. This is horribly inefficient for large files. Best to import into a column format and use predicate pushdown for the layers below to seek round the file for filtering and reading columns
Upvotes: 2
Reputation: 71
Upvotes: 2
Reputation: 81454
When you specify files located on S3 they are read into the cluster. The processing happens on the cluster nodes.
However, this may be changing with S3 Select, which is now in preview.
Upvotes: 1