Bill
Bill

Reputation: 363

checkpoint SqlContext nullpointerException issue

I am using check pointing in my application, and when my application starts with a failure, I get a NullPointerException on SQLContext.
I assume the application is not able to recover the SQLContext because of serialization/deserialization issues. Is SQLContext not serializable?

Here is my code below

    //DriverClass
    final JavaSparkContext javaSparkCtx = new JavaSparkContext(conf);
    final SQLContext sqlContext = new SQLContext(javaSparkCtx);

    JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() {
        @Override
        public JavaStreamingContext create() { //only first time executed
            // TODO Auto-generated method stub

            JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx, Durations.minutes(1));
            jssc.checkpoint(CHECKPOINT_DIRECTORY);

            HashMap < String, String > kafkaParams = new HashMap < String, String > ();
            kafkaParams.put("metadata.broker.list",
                            "abc.xyz.localdomain:6667");
            //....
            JavaDStream < String > fullMsg = messages
                                             .map(new MapFunction());

            fullMsg.foreachRDD(new SomeClass(sqlContext));
            return jssc;
        }
    };
}

//Closure Class
public class SomeClass implements Serializable, Function < JavaRDD < String > , Void > {
    SQLContext sqlContext;
    public SomeClass(SQLContext sqlContext) {
        // TODO Auto-generated constructor stub
        this.sqlContext = sqlContext;
    }
    public void doSomething() {
        this.sqlContext.createDataFrame();**// here is the nullpointerException**
    }
    //.......
}

Upvotes: 2

Views: 2211

Answers (1)

zsxwing
zsxwing

Reputation: 20826

SQLContext is Serializable because Spark SQL needs to use SQLContext in the executor side internally. However, you should not serialize it to the Streaming checkpoint. Instead, you should get it from rdd like this SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

See Streaming docs for more details: http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#dataframe-and-sql-operations

Upvotes: 4

Related Questions