Reputation: 1615
Is it possible to send an array of String by Kafka Producer object. I want to take some messages from 'topic1' - lines of text then split it to single words and send it to another topic.
object KafkaConsumer extends App {
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val log = actorSystem.log
// PRODUCER config
val producerSettings = ProducerSettings(
actorSystem,
new ByteArraySerializer,
new StringSerializer)
.withBootstrapServers("localhost:9092")
.withProperty("auto.create.topics.enable", "true")
// CONSUMER config
val consumerSettings = ConsumerSettings(
system = actorSystem,
keyDeserializer = new ByteArrayDeserializer,
valueDeserializer = new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("kafka-sample")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// -----------------------------------------------------------------------//
// ROUTE OF THE APP
Consumer.committableSource(consumerSettings,
Subscriptions.topics("topic1"))
.map {
msg => println(s"topic1 -> topic2: $msg")
ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
}
.runWith(Producer.commitableSink(producerSettings))
}
Upvotes: 0
Views: 1796
Reputation: 4133
The Akka Streams sample creates a simple stream that reads one message, uses a Sink that produces to Kafka and commits the offset for the consumed message. If you need to read one or more messages and produce many as words exist in the consumed set, you should play more with the Akka Stream Graph api.
This example uses Graphs and builds one Source from Kafka and uses groupedWithin to read a bunch of messages and get the existing words.
Two simple flows are created, one for getting the last offset and another to get the words. Then a Source stage is created broadcasting the consumed message from Kafka to both flows and zipping the result in a tuple (Seq[String],Long). With the runForeach function messages are produced. Note that messages aren´t produced in order with Future.sequence.
Although the sample can look long it compiles and work properly using "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"
import java.util.Properties
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}
import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._
object SplitSource extends App {
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val log = actorSystem.log
// PRODUCER config
val producerSettings = ProducerSettings(actorSystem,
new ByteArraySerializer,
new StringSerializer)
.withBootstrapServers("localhost:9092")
.withProperty("auto.create.topics.enable", "true")
// CONSUMER config
val consumerSettings =
ConsumerSettings(system = actorSystem,
keyDeserializer = new ByteArrayDeserializer,
valueDeserializer = new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("kafka-sample4")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val producerConfig = {
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
props
}
lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)
// Create Scala future from Java
private def publishToKafka(id: String, data: String) = {
Future {
kafkaProducer
.send(new ProducerRecord("outTopic", id, data))
.get()
}
}
def getKafkaSource =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("inTopic"))
// It consumes 10 messages or waits 30 seconds to push downstream
.groupedWithin(10, 30 seconds)
val getStreamSource = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in = getKafkaSource
// BroadCast to two flows. One for obtain the last offset to commit
// and other to return the Seq with the words to publish
val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)
// Flow that creates the list of all words in all consumed messages
val _flowWords =
Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
input.map(_.record.value()).mkString(" ").split(" ")
})
val zip = Zip[CommittableOffset, Array[String]]
// build the Stage
in ~> br ~> flowCommit ~> zipResult.in0
br ~> _flowWords ~> zipResult.in1
SourceShape(zipResult.out)
}
Source.fromGraph(getStreamSource).runForeach { msgs =>
{
// Publish all words and when all futures complete the commit the last Kafka offset
val futures = msgs._2.map(publishToKafka("outTopic", _)).toList
// Produces in parallel!!. Use flatMap to make it in order
Future.sequence(futures).onComplete {
case Success(e) => {
// Once all futures are done, it makes commit to the last consumed message
msgs._1.commitScaladsl()
}
}
}
}
}
The Akka Stream api allows to create awesome processing pipelines.
Upvotes: 1
Reputation: 3775
You should use mapConcat
before map
, because it
Transform each input element into an
Iterable
of output elements that is then flattened into the output stream.
Full additional line will be like:
Subscriptions.topics("topic1"))
.mapConcat { msg => msg.record.value().split(" ").toList }
.map { ...
Upvotes: 0