Thisara
Thisara

Reputation: 179

Spark Streaming checkpoints reading after failure

I'm trying to implement a Spark Streaming with Kafka application including fault tolerance. When I restart the application it read the messages already read before the restart and my calculations are gone wrong. Please help me to resolve this issue.

Here is the code written in Java.

public static JavaStreamingContext createContextFunc() {

    SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints();

    ApplicationConf conf = new ApplicationConf();
    String checkpointDir = conf.getCheckpointDirectory();

    JavaStreamingContext streamingContext =  app.getStreamingContext(checkpointDir);

    JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext);

    return streamingContext;
}


public static void main(String[] args) throws InterruptedException {

    String checkpointDir = conf.getCheckpointDirectory();

    Function0<JavaStreamingContext> createContextFunc = () -> createContextFunc();
    JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc);

    streamingContext.start();
    streamingContext.awaitTermination();

}

public JavaStreamingContext getStreamingContext(String checkpointDir) {

    ApplicationConf conf = new ApplicationConf();
    String appName = conf.getAppName();
    String master = conf.getMaster();
    int duration = conf.getDuration();

    SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));
    streamingContext.checkpoint(checkpointDir);

    return streamingContext;
}

public SparkSession getSession() {

    ApplicationConf conf = new ApplicationConf();
    String appName = conf.getAppName();
    String hiveConf = conf.getHiveConf();
    String thriftConf =  conf.getThriftConf();
    int shufflePartitions = conf.getShuffle();

    SparkSession spark = SparkSession
            .builder()
            .appName(appName)
            .config("spark.sql.warehouse.dir", hiveConf)
            .config("hive.metastore.uris", thriftConf)
            .enableHiveSupport()
            .getOrCreate();

    spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions);
    return spark;

}


public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) {

    KafkaConfig kafkaConfig = new KafkaConfig();
    Set<String> topicsSet = kafkaConfig.getTopicSet();
    Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams();

    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    JavaDStream<String> logdata = messages.map(ConsumerRecord::value);

    return logdata;
}

Here is the link to the github project. https://github.com/ThisaST/Spark-Fault-Tolerance

Upvotes: 1

Views: 286

Answers (1)

Thisara
Thisara

Reputation: 179

I have overcome the issue by adding the following configuration in the code.

sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true")

Upvotes: 1

Related Questions