syv
syv

Reputation: 3608

In spark, is it possible to reuse a DataFrame's execution plan to apply it to different data sources

I have a bit complex pipeline - pyspark which takes 20 minutes to come up with execution plan. Since I have to execute the same pipeline multiple times with different data frame (as source) Im wondering is there any option for me to avoid building execution plan every time? Build execution plan once and reuse it with different source data?`

Upvotes: 3

Views: 1572

Answers (3)

Sim
Sim

Reputation: 13528

There is a way to do what you ask but it requires advanced understanding of Spark internals. Spark plans are simply trees of objects. These trees are constantly transformed by Spark. They can be "tapped" and transformed "outside" of Spark. There is a lot of devil in the details and thus I do not recommend this approach unless you have a severe need for it.

Before you go there, it important to look at other options, such as:

  1. Understanding what exactly is causing the delay. On some managed planforms, e.g., Databricks, plans are logged in JSON for analysis/debugging purposes. We sometimes seen delays of 30+ mins with CPU pegged at 100% on a single core while a plan produces tens of megabytes of JSON and pushes them on the wire. Make sure something like this is not happening in your case.

  2. Depending on your workflow, if you have to do this with many datasources at the same time, use driver-side parallelism to analyze/optimize plans using many cores at the same time. This will also increase your cluster utilization if your jobs have any skew in the reduce phases of processing.

  3. Investigate the benefit of Spark's analysis/optimization to see if you can introduce analysis barriers to speed up transformations.

Upvotes: 3

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

As @EnzoBnl pointed out, this is not possible as Tungsten applies optimisations specific to the object. What you could do instead (if possible with your data) is to split your large file into smaller chunks that could be shared between the multiple input dataframes and use persist() or checkpoint() on them. Specifically checkpoint makes the execution plan shorter by storing a mid-point, but there is no way to reuse. See

Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

Upvotes: 1

ebonnal
ebonnal

Reputation: 1167

This is impossible because the source DataFrame affects the execution of the optimizations applied to the plan.

Upvotes: 1

Related Questions