Reputation: 336
When all the messages have been handled, I need to shut off the Kafka consumer until the next call. I obviously won't be able to use what I wrote. Any assistance would be greatly valued.
val consumerDrainingControl = Consumer
.mapAsync(1) { msg =>
// Process
Future.successful(msg.committableOffset)
}
.via(Committer.flow(committerDefaults.withMaxBatch(1)))
.toMat(Sink.ignore)(DrainingControl.apply)
.run()
consumerDrainingControl.drainAndShutdown()
Upvotes: 0
Views: 109