manmeet
manmeet

Reputation: 340

Using map.reduce giving SPARK-5063 error but running fine in Interactive Shell

Overall problem: make the schema from a csv file and apply it to the data file. I have an RDD with one column and I want to make a string out of it. So I am using the below code to do that, which works fine in pyspark Interactive Shell but fails in a spark job.

schema = metadata.map(lambda l: l).reduce(lambda l, m: l+ "," + m)

So output should be like 'id, name, age'. But when I execute the job I get the error:

Exception: It appears that you are attempting to broadcast an RDD or reference
an RDD from an action or transformation. RDD transformations and actions can
only be invoked by the driver, not inside of other transformations; for
example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the
values transformation and count action cannot be performed inside of the
rdd1.map transformation. For more information, see SPARK-5063.

Putting in the complete spark Job:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# create configuration for this job
conf = SparkConf().setAppName('DFtest')

from pyspark import SparkConf, SparkContext
# create spark context for the job
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=conf)
from pyspark import SparkConf, SparkContext

# create a sqlContext for Data Frame operations
from pyspark import SparkConf, SparkContext
sqlContext = SQLContext(sc)
from pyspark import SparkConf, SparkContext

metadata_input = "file:/home/mapr/metadata/loans_metadata.csv"
data_input = "/user/bedrock/data/loans_incr/loans-2016-02-24-06-00-00-2016-02
25-06-00-00-864ca30f-097f-4234-87bc-7f1a7d57aa7e.csv"

metadata = sc.textFile(metadata_input)

header = metadata.filter(lambda l: "Technical Name" in l)

metadata = metadata.filter(lambda l: l != header)

metadata = metadata.map(lambda l: l.split(",")[0]

schema = metadata.map(lambda l: l).reduce(lambda l, m: l+ "," + m)

fields = [StructField(field_name, StringType(), True) for field_name in
schema.split(",")]

finalSchema = StructType(fields)

data = sc.textFile(data_input)

df = sqlContext.createDataFrame(data, finalSchema)

df.show()

sc.stop()

I looked at other posts on this error, but was not able to understand how this is a nested Map. An I understand little bit, that this can be solved by "Broadcasting" but not sure how. Please advise.

Upvotes: 0

Views: 2357

Answers (1)

zero323
zero323

Reputation: 330443

The problem is here:

header = metadata.filter(lambda l: "Technical Name" in l)

header is a RDD not a local object. Try to fetch this to the driver first:

header = metadata.filter(lambda l: "Technical Name" in l).first()

although this:

metadata.filter(lambda l: "Technical Name" not in l)

should have the same effect if you expect only one Technical Name occurence.

Upvotes: 2

Related Questions