Harmeet Singh Taara
Harmeet Singh Taara

Reputation: 6611

Apache Kafka: KafkaProducerActor throws exception ASk timeout.

I am using cake solution Akka client for scala and Kafka. While I am creating a KafkaProducerActor actor and trying to send message using ask pattern and return future and perform some operations, but every time, I am facing ask timeout exception. Below is my code:

class SimpleAkkaProducer (config: Config, system: ActorSystem) {

  private val producerConf = KafkaProducer.
    Conf(config,
      keySerializer = new StringSerializer,
      valueSerializer = new StringSerializer)

  val actorRef = system.actorOf(KafkaProducerActor.props(producerConf))

  def sendMessageWayOne(record: ProducerRecords[String, String]) = {
    actorRef ! record
  }

  def sendMessageWayTwo(record: ProducerRecords[String, String]) = {
    implicit val timeout = Timeout(100.seconds)
    val future = (actorRef ? record).mapTo[String]
    future onComplete  {
      case Success(data) => println(s" >>>>>>>>>>>> ${data}")
      case Failure(ex) => ex.printStackTrace()
    }
  }
}

object SimpleAkkaProducer {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("KafkaProducerActor")
    val config = ConfigFactory.defaultApplication()
    val simpleAkkaProducer = new SimpleAkkaProducer(config, system)

    val topic = config.getString("akka.topic")
    val messageOne = ProducerRecords.fromKeyValues[String, String](topic,
      Seq((Some("Topics"), "First Message")), None, None)

    simpleAkkaProducer.sendMessageWayOne(messageOne)
    simpleAkkaProducer.sendMessageWayTwo(messageOne)
  }
}

Following is exception :

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords".
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864)
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:745)

Upvotes: 1

Views: 582

Answers (1)

Jaakko Pallari
Jaakko Pallari

Reputation: 1681

The producer actor only responds to the sender, if you specify the successResponse and failureResponse values in the ProducerRecords to be something other than None. The successResponse value is sent back to the sender when the Kafka write succeeds, and failureResponse value is sent back when the Kafka write fails.

Example:

val record = ProducerRecords.fromKeyValues[String, String](
  topic = topic,
  keyValues = Seq((Some("Topics"), "First Message")),
  successResponse = Some("success"),
  failureResponse = Some("failure")
)

val future = (actorRef ? record).mapTo[String]
future onComplete  {
  case Success("success") => println("Send succeeded!")
  case Success("failure") => println("Send failed!")
  case Success(data) => println(s"Send result: $data")
  case Failure(ex) => ex.printStackTrace()
}

Upvotes: 2

Related Questions