KarlR
KarlR

Reputation: 1615

Kafka - scala - processing multiple messages

Is it possible to send an array of String by Kafka Producer object. I want to take some messages from 'topic1' - lines of text then split it to single words and send it to another topic.

    object KafkaConsumer extends App {

      implicit val actorSystem = ActorSystem("test-actor-system")
      implicit val streamMaterializer = ActorMaterializer()
      implicit val executionContext = actorSystem.dispatcher
      val log = actorSystem.log


      // PRODUCER config
      val producerSettings = ProducerSettings(
        actorSystem,
        new ByteArraySerializer,
        new StringSerializer)
        .withBootstrapServers("localhost:9092")
        .withProperty("auto.create.topics.enable", "true")

      // CONSUMER config
      val consumerSettings = ConsumerSettings(
        system = actorSystem,
        keyDeserializer = new ByteArrayDeserializer,
        valueDeserializer = new StringDeserializer)
        .withBootstrapServers("localhost:9092")
        .withGroupId("kafka-sample")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      // -----------------------------------------------------------------------//

      // ROUTE OF THE APP
      Consumer.committableSource(consumerSettings, 
      Subscriptions.topics("topic1"))
     .map { 
           msg => println(s"topic1 -> topic2: $msg") 
           ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
          }
     .runWith(Producer.commitableSink(producerSettings))
     }  

Upvotes: 0

Views: 1796

Answers (2)

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

The Akka Streams sample creates a simple stream that reads one message, uses a Sink that produces to Kafka and commits the offset for the consumed message. If you need to read one or more messages and produce many as words exist in the consumed set, you should play more with the Akka Stream Graph api.

This example uses Graphs and builds one Source from Kafka and uses groupedWithin to read a bunch of messages and get the existing words.

Two simple flows are created, one for getting the last offset and another to get the words. Then a Source stage is created broadcasting the consumed message from Kafka to both flows and zipping the result in a tuple (Seq[String],Long). With the runForeach function messages are produced. Note that messages aren´t produced in order with Future.sequence.

Although the sample can look long it compiles and work properly using "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"

import java.util.Properties

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}

import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._

object SplitSource extends App {

  implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher
  val log = actorSystem.log

  // PRODUCER config
  val producerSettings = ProducerSettings(actorSystem,
                                          new ByteArraySerializer,
                                          new StringSerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("auto.create.topics.enable", "true")

  // CONSUMER config
  val consumerSettings =
    ConsumerSettings(system = actorSystem,
                     keyDeserializer = new ByteArrayDeserializer,
                     valueDeserializer = new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("kafka-sample4")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  implicit val producerConfig = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("key.serializer", classOf[StringSerializer].getName)
    props.setProperty("value.serializer", classOf[StringSerializer].getName)
    props
  }

  lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)

  // Create Scala future from Java
  private def publishToKafka(id: String, data: String) = {
    Future {
      kafkaProducer
        .send(new ProducerRecord("outTopic", id, data))
        .get()
    }
  }

  def getKafkaSource =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("inTopic"))
      // It consumes 10 messages or waits 30 seconds to push downstream
      .groupedWithin(10, 30 seconds)

  val getStreamSource = GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val in = getKafkaSource

    // BroadCast to two flows. One for obtain the last offset to commit
    // and other to return the Seq with the words to publish
    val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
    val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
    val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)

    // Flow that creates the list of all words in all consumed messages
    val _flowWords =
      Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
        input.map(_.record.value()).mkString(" ").split(" ")
      })

    val zip = Zip[CommittableOffset, Array[String]]

    // build the Stage
    in ~> br ~> flowCommit ~> zipResult.in0
          br ~> _flowWords ~> zipResult.in1

    SourceShape(zipResult.out)
  }

  Source.fromGraph(getStreamSource).runForeach { msgs =>
    {
      // Publish all words and when all futures complete the commit the last Kafka offset
      val futures = msgs._2.map(publishToKafka("outTopic", _)).toList

      // Produces in parallel!!. Use flatMap to make it in order
      Future.sequence(futures).onComplete {
        case Success(e) => {
          // Once all futures are done, it makes commit to the last consumed message
          msgs._1.commitScaladsl()
        }
      }
    }
  }

}

The Akka Stream api allows to create awesome processing pipelines.

Upvotes: 1

red1ynx
red1ynx

Reputation: 3775

You should use mapConcat before map, because it

Transform each input element into an Iterable of output elements that is then flattened into the output stream.

Full additional line will be like:

Subscriptions.topics("topic1"))
  .mapConcat { msg => msg.record.value().split(" ").toList }
  .map { ...

Upvotes: 0

Related Questions