yet_another_programmer
yet_another_programmer

Reputation: 327

How to transform a Kafka Stream event and send it to another topic only if could be transformed

I want to build a simple Kafka stream that tries to transform events based on some conditions. If the event can be transformed, the transformed event goes into a different topic. If the event cannot be transformed, it is stored again in the same topic for a future try.

Let's say I have this:

case class Foo(a: String, b: String, c: Boolean)

def translate(value: String): Option[Foo] = {
  // ...
  // Returns an Option of Foo
}

So I would need to have something like this:

val builder: StreamsBuilder = new StreamsBuilder()

builder
  .stream(topic)
  .map[String, String]((key, value) => translate(value))
  // If translate(value) is Some(value) send the value to a topic
  // Otherwise, send the original value (without being transformed) to the same topic

I'm totally stuck with this issue. The nearest thing I've come across is try to create a structure with a boolean that tells me if the event can be transformed or not, and then create different streams with a .branch. For instance, something like this:

def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
  val eventTransformed = transform(value)
  eventTransformed match {
    case Some(value) => (true, Option(value))
    case None => (false, None)
  }
}

And then try to do something like this:

builder
  .stream(topic)
  .map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
  .branch(
    (_, element) => element._1,
  )
  .foreach {
    // Send the "true" to one topic and in the "false", send the original message to the original topic 
  }

But of course I would need to have the original event to send it to the topic.

I've though about more complicated structures, but in the end I always come back to the problem of branching the stream based on a Some-None condition.

Upvotes: 0

Views: 1179

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Maybe use the Processor API. You have one Processor that does the translation, and if the translations is successful you context.forward(To.child("translated")) otherwise you context.forward(To.child("retry")).

You plug your Topology together manually:

Topology topology = new Topology();
topology.addSource("source", topic);
topology.addProcessor("translator", () -> new TranslateProcessor(), "source");
topology.addSink("translated", resultTopic, "translator");
topology.addSink("retry", topic, "translator");

Upvotes: 1

Related Questions