DennisLi
DennisLi

Reputation: 4154

How to make spark streaming commit in each batch when limiting Kafka batch size?

To limit the batch size when using Spark streaming, I referenced this answer

There is about 50 millions records stocking (about to be consumed) in Kafka. The topic is with 3 partitions.

zhihu_comment   0          10906153        28668062        17761909        -               -               -
zhihu_comment   1          10972464        30271728        19299264        -               -               -
zhihu_comment   2          10906395        28662007        17755612        -               -               -

My consumer app:

public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";

    // Create context with a certain seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("max.poll.records", "500");


    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    lines.count().print();

    jssc.start();
    jssc.awaitTermination();
  }
}

I have limited the consuming size of spark streaming, in my case, I set maxRatePerPartition to 10000, which means it consumed 300000 records per batch in my case.

The question is although spark streaming is able to handle records with specific limit, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset:

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

It appears that Spark streaming does not commit the offset in each batch, it commits the latest offset at the beginning when it starts to consume!

Is there any way to make spark streaming commit with each batch?

Spark streaming log, proving the records num it consumed each batch:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000

20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms

Upvotes: 1

Views: 1150

Answers (1)

Michael Heil
Michael Heil

Reputation: 18495

You need to disable

kafkaParams.put("enable.auto.commit", false);

and rather use

messages.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // do here some transformations and action on the rdd, typically like:
  rdd.foreachPartition(it -> {
    it.foreach(row -> ...)
  })

  // commit messages
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

as described in the Spark + Kafka Integration Guide.

You could also use commitSync for synchronous commits.

Upvotes: 2

Related Questions