Bobby O
Bobby O

Reputation: 31

Profiling the Spark Analyzer: how to access the QueryPlanningTracker for a pyspark query?

Any Spark & Py4J gurus available to explain how to reliably access Spark's java objects and variables from the Python side of pyspark? Specifically, how to access the data in Spark's QueryPlanningTracker from python?

Details

I am trying to profile a creating a pyspark dataframe (df = spark_session.sql(thousand_line_query)). Not running the query. Just creating the dataframe so I can inspect its schema. Merely waiting for the return from that .sql() call which initializes the dataframe with no data takes a long time (10-30 seconds). I have tracked the slow steps to Spark's Analyzer stage. Logging (below) suggests Spark is recomputing the same sub-query too many times, so I'm trying to dig in and see what is going on by profiling Spark's work on my query. I tried to methods from a number of articles for profiling the Spark Optimizer stage for executing queries (e.g. Luca Canali's sparkMeasure, Rose Toomey's Care and Feeding of Catalyst Optimizer). But I have found no guide that focuses on profiling the Spark Analyzer stage that runs before the Optimizer stage. (Hence I also include extra details below on what I've found that others may find helpful.)

Reading Spark's Scala sourcecode, I see the Analyzer is a RuleExecutor, and RuleExecutors have a QueryPlanningTracker which seems to record details on each invocation of each Analyzer Rule that Spark runs, specifically to allow a person to reconstruct a timeline of what the analyzer is doing for a single query.

However, I cannot seem to access the data in the Analyzer's QueryPlanningTracker from python. I would like to be able to retrieve a QueryPlanningTracker java object with the full details of the run of one query, and to inspect what fields & methods are available on the Python code. Any advice?

Examples

In python using pyspark, request a dataframe for my 1,000-line query and find it is slow:

query_sql = 'SELECT ... <long query here>
spark_df = spark_session.sql(query_sql) # takes 10-30 seconds

Turn on copious logging, rerun query above, look at output & see the slow steps all mention the PlanCheckLogger which is in the Spark Analyzer. Also access Spark's RuleExecutor to see how much time is used by each rule & which rules are not effective:

spark_session.sparkContext.setLogLevel('ALL')
rule_executor = spark_session._jvm.org.apache.spark.sql.catalyst.rules.RuleExecutor
rule_executor.resetMetrics()
spark_df = spark_session.sql(query_sql)  # logs 10,000+ lines of output, lines with keyword `PlanChangeLogger` give timestamps showing the slow steps are in the Analyzer, but not the order of steps that occur
print(rule_executor.dumpTimeSpent())  # prints Analyzer rules that ran, how much time was 'effective' for each rule, but no specifics on order of rules run, no details on why rules took up a lot of time but were not effective.

Next: Try (unsuccessfully) to access Spark's QueryPlanningTracker data to drill down to a timeline of rules run, how long each call to each rule took, and any other specifics I can get:

tracker = spark_session._jvm.org.apache.spark.sql.catalyst.QueryPlanningTracker
## Use some call here to show data contents of the tracker; which currently gives E.g. intitial exploration: 
tracker.measurePhase.topRulesByTime(10)
*** TypeError: 'JavaPackage' object is not callable ....

The above is one example; The tracker code suggests it has other methods & fields I could use, however I do not see how to access those nor how to inspect from Python to see what methods & fields are available, so it is just trial & error from reading Spark's github repository ...

Upvotes: 3

Views: 1224

Answers (1)

Warren Zhu
Warren Zhu

Reputation: 1495

You can try this:

>>> df = spark.range(1000).selectExpr("count(*)")

>>> tracker = df._jdf.queryExecution().tracker()
>>> print(tracker)
org.apache.spark.sql.catalyst.QueryPlanningTracker@5702d8be

>>> print(tracker.topRulesByTime(10))
Stream((org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions,RuleSummary(27004600, 2, 1)), ?)

I'm not sure what kinds of info you need. But if you want to know query plan generated. You can use df.explain()

Upvotes: 0

Related Questions