Reputation: 687
When my producer has finished streaming all of its messages to Kafka, and after Flink has finished processing them, I want to be able to terminate the Flink job so it doesn't keep running, and also so I can know when Flink has finished processing all the data. I also cannot use batch processing as I need Flink to run in parallel to my Kafka stream.
Usually, Flink uses the isEndOfStream
method in a DeserializationSchema
class to see if it should end early (returning true in the method would automatically end the job). However, when using Kafka as a source with Flink, the new KafkaSource
class has deprecated the use of the isEndOfStream
method in deserializers and no longer checks it to see if the stream should end or not. Is there any other way to terminate a Flink job early?
Upvotes: 0
Views: 853
Reputation: 43454
The mechanism provided by the KafkaSource
for operating on bounded streams is to use setBounded
or setUnbounded
with the builder, as in
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...) // or setValueOnlyDeserializer
.setStartingOffsets(...)
.setBounded(...) // or setUnbounded
.build();
setBounded
indicates that the source should be stopped once it has consumed all of the data up through the specified offsets.
setUnbounded
can be used instead to indicate that while the source should not read any data past the specified offsets, it should remain running. This allows the source to participate in checkpointing if running in STREAMING mode.
If you know upfront how much you want to read, this works fine. I've used setBounded
with a specific timestamp, e.g.,
.setBounded(
OffsetsInitializer.timestamp(
Instant.parse("2021-10-31T23:59:59.999Z").toEpochMilli()))
and also like this
.setBounded(OffsetsInitializer.latest())
Upvotes: 1