Reputation: 363
I am using check pointing in my application, and when my application starts with a failure, I get a NullPointerException
on SQLContext
.
I assume the application is not able to recover the SQLContext
because of serialization/deserialization issues. Is SQLContext
not serializable?
Here is my code below
//DriverClass
final JavaSparkContext javaSparkCtx = new JavaSparkContext(conf);
final SQLContext sqlContext = new SQLContext(javaSparkCtx);
JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() { //only first time executed
// TODO Auto-generated method stub
JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx, Durations.minutes(1));
jssc.checkpoint(CHECKPOINT_DIRECTORY);
HashMap < String, String > kafkaParams = new HashMap < String, String > ();
kafkaParams.put("metadata.broker.list",
"abc.xyz.localdomain:6667");
//....
JavaDStream < String > fullMsg = messages
.map(new MapFunction());
fullMsg.foreachRDD(new SomeClass(sqlContext));
return jssc;
}
};
}
//Closure Class
public class SomeClass implements Serializable, Function < JavaRDD < String > , Void > {
SQLContext sqlContext;
public SomeClass(SQLContext sqlContext) {
// TODO Auto-generated constructor stub
this.sqlContext = sqlContext;
}
public void doSomething() {
this.sqlContext.createDataFrame();**// here is the nullpointerException**
}
//.......
}
Upvotes: 2
Views: 2211
Reputation: 20826
SQLContext
is Serializable because Spark SQL needs to use SQLContext
in the executor side internally. However, you should not serialize it to the Streaming checkpoint. Instead, you should get it from rdd like this SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
See Streaming docs for more details: http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#dataframe-and-sql-operations
Upvotes: 4