Reputation: 3106
I have mysql table stored in dataframe_mysql
dataframe_mysql = sqlContext.read.format("jdbc").options(...
dataframe_mysql.registerTempTable('dataf')
groupedtbl=sqlContext.sql("""SELECT job_seq_id,first(job_dcr_content) as firststage,last(job_dcr_content) as laststage,
first(from_stage) as source, last(from_stage) as target , count(jid) as noofstages from dataf group by job_seq_id having count(jid)>1""" )
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
func1 = udf(fu1, StringType())
func2= udf(fu2, StringType())
res1=groupedtbl.withColumn('dcol',func1(groupedtbl.firststage,groupedtbl.lastage,groupedtbl.job_seq_id))
res2=res1.withColumn('lcol',func2(res1.dcol,res1.job_seq_id))
For the above code I see that even I issue a limit command:
lb=res2.limit(2).collect()
or the following command to only get results of one record:
labels.filter(res2.job_seq_id==5843064)
Instead of just working to get the two results in the first query or a single result in the second , it does a lot of unnecessary computations on other rows which wastes times even if only two rows are required. I can see this from the internal logs then even for just fetching two rows its computing 100s of rows and then retrieving the two result rows from them. I though that the DAG mechanism should've handle this but it seems it does not , am I wrong in this observation?
Upvotes: 3
Views: 997
Reputation: 330093
There are quite a few different problems here. Some are related to the data source you use, other to the query and finally there some introduced by using Python UDFs with Spark < 2.0.0.
Step by step:
with JDBC data source the only the simple predicates are pushed-down to the source. It means only conditions which are inside primary WHERE
clause (not inside HAVING
or other computed fields). Everything else including aggregations and limits happens inside Spark (see Does spark predicate pushdown work with JDBC?, More than one hour to execute pyspark.sql.DataFrame.take(4)).
without explicit partitioning Spark has no apriori knowledge about data distribution. It means that any code that requires data aggregation will have to access all records. As a result limit
clause with aggregation can exist only after aggregation in the execution plan. This means that:
res2.limit(2)
cannot be optimized.
PySpark UDF in Spark < 2.0.0 introduce implicit breakpoints in the execution plan:
from pyspark.sql.functions import col, udf
options = ...
df = sqlContext.read.format("jdbc").options(**options).load()
df.printSchema()
## root
## |-- x: integer (nullable = true)
## |-- y: integer (nullable = true)
You can see that predicate is pushed down if there is no BatchPythonEvaluation
df.groupBy("x").sum("y").where(col("x") > 2).explain()
## == Physical Plan ==
## TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#192L])
## +- TungstenExchange hashpartitioning(x#182,200), None
## +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#197L])
## +- Filter (x#182 > 2)
## +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,{...})[x#182,y#183] PushedFilters: [GreaterThan(x,2)]
but is not when add UDF
call, even though output is not used
identity = udf(lambda x: x)
df.groupBy("x").sum("y").withColumn("foo", identity(col("x"))).where(col("x") > 2).explain()
== Physical Plan ==
## Project [x#182,sum(y)#214L,pythonUDF#216 AS foo#215]
## +- Filter (x#182 > 2)
## +- !BatchPythonEvaluation PythonUDF#<lambda>(x#182), [x#182,sum(y)#214L,pythonUDF#216]
## +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Final,isDistinct=false)], output=[x#182,sum(y)#214L])
## +- TungstenExchange hashpartitioning(x#182,200), None
## +- TungstenAggregate(key=[x#182], functions=[(sum(cast(y#183 as bigint)),mode=Partial,isDistinct=false)], output=[x#182,sum#219L])
## +- Scan JDBCRelation(...,point,[Lorg.apache.spark.Partition;@61ee5d1a,{...})[x#182,y#183]
This behavior has been optimized in Spark 2.0.0.
Upvotes: 1