kar09
kar09

Reputation: 431

How is ColumnarToRow an efficient operation in Spark

In my understanding columnar format is better for MapReduce tasks. Even for something like selection of some columns, columnar works well as we don't have to load other columns into memory.

But in Spark 3.0 I'm seeing this ColumnarToRow operation being applied in the query plans which from what I could understand from the docs converts the data into row format.

How is it more efficient than the columnar representation, what are the insights that govern application of this rule?

For the following code I've attached the query plan.

import pandas as pd

df = pd.DataFrame({
    'a': [i for i in range(2000)],
    'b': [i for i in reversed(range(2000))],
})

df = spark.createDataFrame(df)

df.cache()
df.select('a').filter('a > 500').show()

query plan

Upvotes: 16

Views: 4573

Answers (2)

mazaneicha
mazaneicha

Reputation: 9427

ColumnarToRow part in your WSCG is actually a conversion of pandas dataframe to Spark DataFrame rather than any indication of how Spark processes its own dataframes.

If we start with a "native" Spark df, the plan looks much different:

>>> a = range(2000)                                                                                                                                                    
>>> b = [ i for i in reversed(range(2000))]                                                                                                                            
>>> df = spark.createDataFrame(zip(a,b),["a","b"])                                                                                                 
>>> df.select('a').filter('a > 500').show()                                                                                                     

enter image description here

Besides, the link you were referring to says:

case classColumnarToRowExec(child: SparkPlan) extends SparkPlan with UnaryExecNode with CodegenSupport with Product with Serializable

Provides a common executor to translate an RDD of ColumnarBatch into an RDD of InternalRow. This is inserted whenever such a transition is determined to be needed.

...which basically means a conversion of an external RDD (pandas in your case) into internal Spark representation (RDD of InternalRows).

Upvotes: 2

Dennis Jaheruddin
Dennis Jaheruddin

Reputation: 21563

I only read on this briefly, but it seems this general logic holds:

A columnar format helps you select certain columns most efficiently. A row format helps you to select certain rows most efficiently.

So when you want to select certain rows a lot 'e.g. per country, per day, ...' having a row based format with an index on the filter column often makes sense.

Here is also a reference where they show that an index is to be defined: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Upvotes: 0

Related Questions