Reputation: 2431
Let's suppose you have very big parquet files from which you want to filter a subset and save it:
val df = spark.read.parquet(inputFileS3Path)
.select(c1, c2, c3)
.where("c1 = '38940f'")
df.write.parquet(outputFileS3Path)
Does Spark read in memory all the parquet files first and then does the filtering? Is there a way in which for example, Spark reads just a batch and keep in memory only the records that satisfy the filter condition?
I am running Spark 2.2 in a Zeppelin notebook and what it seems is happening is that it reads all in memory and then does the filtering, making the process crashing sometimes (in the Spark Web UI, the input in the stage is like > 1TB but the output in S3 is 1 MB).
Is there a more efficient way to filter these files (whether is changing code, file formats, Spark version etc. etc.)? I already select just a subset of the columns, but it doesn't seem enough.
UPDATE
After further investigations, I noticed that Spark was reading all in, in case the filter is on a nested field:
val df = spark.read.parquet(inputFileS3Path)
.select(c1, c2, c3)
.where("c1.a = '38940f'")
df.write.parquet(outputFileS3Path)
And I think the functionality is still not implemented (see https://issues.apache.org/jira/browse/SPARK-17636). Do you have any tips besides rewriting all the parquets with the nested fields explicit? Is there a way to force the optimizer to build a better plan?
Upvotes: 2
Views: 1213
Reputation: 1276
Spark supports predicate pushdown for top level fields.
Use df.explain()
method to check the query plan. You should see something like:
+- FileScan parquet [c1#413,c2#414,c3#415] Batched: false,
Format: Parquet, Location: TahoeLogFileIndex[file:/inputFileS3Path], PartitionCount: 4320, PartitionFilters: [],
PushedFilters: [IsNotNull(c1), EqualTo(c1,38940f)],
ReadSchema: struct<c1:string,c2:string,c3:string>
The important parts are PushedFilters to check the predicate pushdown and ReadSchema to check schema pruning.
As you've mentioned in your question predicate pushdown for nested fields is not supported. However you can optimise your query performance on nested fields with schema pruning.
Check my answer on this thread.
Upvotes: 1
Reputation: 2825
Spark does something called predicate pushdown wherein it will filters files based on the predicates that you pass in your SQL context which in this case is c1 = '38940f'
In your case you would need to use the filter
api to do predicate pushdown as shown below
inputDF.filter(_.birthYear == 1999)
Upvotes: 0