Subhakant Priyadarsan
Subhakant Priyadarsan

Reputation: 336

Run a Alpakka Kafka Consumer on Demand in Scala

When all the messages have been handled, I need to shut off the Kafka consumer until the next call. I obviously won't be able to use what I wrote. Any assistance would be greatly valued.

  val consumerDrainingControl = Consumer
    .mapAsync(1) { msg =>
      // Process
      Future.successful(msg.committableOffset)
    }
    .via(Committer.flow(committerDefaults.withMaxBatch(1)))
    .toMat(Sink.ignore)(DrainingControl.apply)
    .run()
  consumerDrainingControl.drainAndShutdown()

Upvotes: 0

Views: 109

Answers (0)

Related Questions