JB007
JB007

Reputation: 133

What does an asterisk preceding a PushedFilters entail in a Spark SQL Explain Plan

Regarding Spark PushedFilters showing in Spark Physical Explain Plans it's stated that (ref. https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html):

The asterisk indicates that push down filter will be handled only at the datasource level.

What does this mean and more importantly when you see a PushedFilters array entry without an asterisk, is the filter still being pushed down to the data source level or not and handled out side of it, but why call it a Pushed Filter then in the first place?

Very confusing and googling it I couldn't find a real answer to the question.

Thanks!

Jan

Upvotes: 2

Views: 606

Answers (1)

Salim
Salim

Reputation: 2178

Pushdown of predicates always happen at the data source level. It happens in a manner that the datasource would selectively scan those pieces of data which are predicated upon. Spark is just a processing engine which hands over the query to the datasource for final execution. The data source on the other hand would execute the query as it wishes. The Spark-sql connectors are aware of the behavior of datasources (based on the schema) so they can predict a physical plan with pushdown predicates but they can't guarantee that it will run, so the asterisk.

I ran a query against a local parquet file. The physical plan has pushed down predicate and no asterisk. It is a local parquet file which Spark reads itself so the physical plan is 100% accurate.

    val df = spark.read.parquet("/Users/Documents/temp/temp1")
    df.filter($"income" >= 30).explain(true)


== Physical Plan ==
*(1) Project [client#0, type#1, address#2, type_2#3, income#4]
+- *(1) Filter (isnotnull(income#4) && (income#4 >= 30))
   +- *(1) FileScan parquet [client#0,type#1,address#2,type_2#3,income#4] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/User/Documents/temp/temp1], PartitionFilters: [], PushedFilters: [IsNotNull(income), GreaterThanOrEqual(income,30)], ReadSchema: struct<client:string,type:string,address:string,type_2:string,income:int>

Here a table is read from Oracle using Spark-SQL. The Oracle DB uses predicate push down and index access but Spark has no idea about it.

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand file:/data/.., false, Parquet, Map(codec -> org.apache.hadoop.io.compress.snappyCodec, path -> /data/...), Overwrite, [COLUMN_01, COLUMN_02, COLUMN_03, COLUMN_04, COLUMN_05, COLUMN_06, COLUMN_07, COLUMN_08, COLUMN_09, COLUMN_10, COLUMN_11, COLUMN_12, COLUMN_13, COLUMN_14, COLUMN_15, COLUMN_16, COLUMN_17, COLUMN_18, ... 255 more fields]
+- Project [COLUMN_01#1246, COLUMN_02#1247, COLUMN_03#1248, COLUMN_04#1249, COLUMN_05#1250, COLUMN_06#1251, COLUMN_07#1252, COLUMN_08#1253, COLUMN_09#1254, COLUMN_10#1255, COLUMN_11#1256, COLUMN_12#1257, COLUMN_13#1258, COLUMN_14#1259, COLUMN_15#1260, COLUMN_16#1261, COLUMN_17#1262, COLUMN_18#1263, ... 255 more fields]
   +- Scan JDBCRelation((select cu.*, ROWIDTONCHAR(t.rowid) as ROW_ID from table t  where (column1 in (786567473,786567520,786567670,786567570,...........)) and column2 in (10,11, ...) and t.result is null)t) [numPartitions=20] [COLUMN_87#1332,COLUMN_182#1427,COLUMN_128#1373,COLUMN_104#1349,COLUMN_189#1434,COLUMN_108#1353,COLUMN_116#1361,COLUMN_154#1399,COLUMN_125#1370,COLUMN_120#1365,COLUMN_267#1512,COLUMN_54#1299,COLUMN_100#1345,COLUMN_230#1475,COLUMN_68#1313,COLUMN_44#1289,COLUMN_53#1298,COLUMN_97#1342,COLUMN_03#1248,COLUMN_16#1261,COLUMN_43#1288,COLUMN_50#1295,COLUMN_174#1419,COLUMN_20#1265,... 254 more fields] PushedFilters: [], ReadSchema: struct<COLUMN_87:string,COLUMN_182:string,COLUMN_128:string,COLUMN_104:string,COLUMN_189:string,C...

Upvotes: 1

Related Questions