Kiran
Kiran

Reputation: 687

Terminate a Flink job when using a Kafka Source

When my producer has finished streaming all of its messages to Kafka, and after Flink has finished processing them, I want to be able to terminate the Flink job so it doesn't keep running, and also so I can know when Flink has finished processing all the data. I also cannot use batch processing as I need Flink to run in parallel to my Kafka stream.

Usually, Flink uses the isEndOfStream method in a DeserializationSchema class to see if it should end early (returning true in the method would automatically end the job). However, when using Kafka as a source with Flink, the new KafkaSource class has deprecated the use of the isEndOfStream method in deserializers and no longer checks it to see if the stream should end or not. Is there any other way to terminate a Flink job early?

Upvotes: 0

Views: 853

Answers (1)

David Anderson
David Anderson

Reputation: 43454

The mechanism provided by the KafkaSource for operating on bounded streams is to use setBounded or setUnbounded with the builder, as in

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(...)
        .setGroupId(...)
        .setTopics(...)
        .setDeserializer(...) // or setValueOnlyDeserializer
        .setStartingOffsets(...)
        .setBounded(...) // or setUnbounded
        .build();

setBounded indicates that the source should be stopped once it has consumed all of the data up through the specified offsets.

setUnbounded can be used instead to indicate that while the source should not read any data past the specified offsets, it should remain running. This allows the source to participate in checkpointing if running in STREAMING mode.

If you know upfront how much you want to read, this works fine. I've used setBounded with a specific timestamp, e.g.,

  .setBounded(
    OffsetsInitializer.timestamp(
      Instant.parse("2021-10-31T23:59:59.999Z").toEpochMilli()))

and also like this

  .setBounded(OffsetsInitializer.latest())

Upvotes: 1

Related Questions