Reputation: 4154
To limit the batch size when using Spark streaming, I referenced this answer
There is about 50 millions records stocking (about to be consumed) in Kafka. The topic is with 3 partitions.
zhihu_comment 0 10906153 28668062 17761909 - - -
zhihu_comment 1 10972464 30271728 19299264 - - -
zhihu_comment 2 10906395 28662007 17755612 - - -
My consumer app:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("max.poll.records", "500");
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}
I have limited the consuming size of spark streaming, in my case, I set maxRatePerPartition
to 10000, which means it consumed 300000 records per batch in my case.
The question is although spark streaming is able to handle records with specific limit, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset
:
zhihu_comment 0 28700537 28700676 139 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 1 30305102 30305224 122 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 2 28695033 28695146 113 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
It appears that Spark streaming does not commit the offset in each batch, it commits the latest offset at the beginning when it starts to consume!
Is there any way to make spark streaming commit with each batch?
Spark streaming log, proving the records num it consumed each batch:
20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
Upvotes: 1
Views: 1150
Reputation: 18495
You need to disable
kafkaParams.put("enable.auto.commit", false);
and rather use
messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// do here some transformations and action on the rdd, typically like:
rdd.foreachPartition(it -> {
it.foreach(row -> ...)
})
// commit messages
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
as described in the Spark + Kafka Integration Guide.
You could also use commitSync
for synchronous commits.
Upvotes: 2