Sid
Sid

Reputation: 77

Function mapped to RDD using rdd.map() called multiple times for some rows

I have a source dataframe which has some records. I want to perform some operation on each row of this dataframe. For this purpose, the rdd.map function was used. However, looking at the logs recorded using accumulators, looks like the mapped function was called multiple times for some rows. As per the documentation, it should be called once ONLY.

I tried replicating the issue in a small script and noticed the same behavior. This script is shown below:

import os
import sys
os.environ['SPARK_HOME'] = "/usr/lib/spark/"
sys.path.append("/usr/lib/spark/python/")
from pyspark.sql import *
from pyspark.accumulators import AccumulatorParam


class StringAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue=""):
        return ""

    def addInPlace(self, s1, s2):
        return s1.strip() + " " + s2.strip()

def mapped_func(row, logging_acc):
    logging_acc += "Started map"
    logging_acc += str(row)
    return "test"

if __name__ == "__main__":
    spark_session = SparkSession.builder.enableHiveSupport().appName("rest-api").getOrCreate()
    sc = spark_session.sparkContext
    df = spark_session.sql("select col1, col2, col3, col4, col5, col6 from proj1_db.dw_table where col3='P1'")
    df.show()
    logging_acc = sc.accumulator("", StringAccumulatorParam())
    result_rdd = df.rdd.map(lambda row: Row(row, mapped_func(row, logging_acc)))
    result_rdd.toDF().show()
    print "logs: " + str(logging_acc.value)

Below is the relevant piece of output:

+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|
+----+----+----+----+----+----+
|   1|   1|  P1|   2|  10|  20|
|   3|   1|  P1|   1|  25|  25|
+----+----+----+----+----+----+

+--------------------+----+
|                  _1|  _2|
+--------------------+----+
|[1, 1, P1, 2, 10,...|test|
|[3, 1, P1, 1, 25,...|test|
+--------------------+----+

logs: Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=3, col2=1, col3=u'P1', col4=1, col5=25, col6=25)

The first table is the source dataframe and the second table is the resultant dataframe created post the map function call. As seen, the function is being called twice for the first row. Can anyone please help me understand what is happening and how can we make sure the mapped function is called only ONCE per row.

Upvotes: 1

Views: 1424

Answers (1)

user11302018
user11302018

Reputation: 21

As per the documentation, it should be called once ONLY.

That's really not the case. Any transformation can be executed arbitrary number of times (typically in case of failures or to support secondary logic) and the documentation says explicitly that:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once

So implicitly accumulators used inside transformations (like map) can be updated multiple times per tasks.

In your case multiple executions happen because you don't provide schema when you convert RDD to DataFrame. In such case Spark will perform another data scan to infer schema from data, i.e.

spark.createDataFrame(result_rdd, schema)

That however will only address this particular issue, and general point about transformation and accumulator behavior stands.

Upvotes: 2

Related Questions