elelias
elelias

Reputation: 4761

NullPointerException applying a function to spark RDD that works on non-RDD

I have a function that I want to apply to a every row of a .csv file:

def convert(inString: Array[String]) : String = {

    val country  = inString(0)
    val sellerId = inString(1)
    val itemID   = inString(2)
    try{
     val minidf = sqlContext.read.json( sc.makeRDD(inString(3):: Nil) )
        .withColumn("country", lit(country))
        .withColumn("seller_id", lit(sellerId))
        .withColumn("item_id", lit(itemID))
         val finalString = minidf.toJSON.collect().mkString(",")
        finalString
    } catch{
         case e: Exception =>println("AN EXCEPTION "+inString.mkString(","))
         ("this is an exception "+e+"  "+inString.mkString(","))
    }
}

This function transforms an entry of the sort:

CA      112578240       132080411845    [{"id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}]

Where I have 4 columns, the 4th being a json blob, into

[{"country":"CA", "seller":112578240", "product":112578240, "id":"general_spam_policy","severity":"critical","timestamp":"2017-02-26T08:30:16Z"}]

which is the json object where the first 3 columns have been inserted into the fourth.

Now, this works:

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).collect().map(x => convert(x))

or this:

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).take(10).map(x => convert(x))

but this does not

val conv_string = sc.textFile(path_to_file).map(_.split('\t')).map(x => convert(x))

The last one throw a java.lang.NullPointerException.

I included a try catch clause so see where exactly is this failing and it's failing for every single row.

What am I doing wrong here?

Upvotes: 0

Views: 416

Answers (1)

jamborta
jamborta

Reputation: 5210

You cannot put sqlContext or sparkContext in a Spark map, since that object can only exist on the driver node. Essentially they are in charge of distributing your tasks.

You could rewite the JSON parsing bit using one of these libraries in pure scala: https://manuel.bernhardt.io/2015/11/06/a-quick-tour-of-json-libraries-in-scala/

Upvotes: 2

Related Questions