A. Gisbert
A. Gisbert

Reputation: 163

How to send a KeyValue list to Kafka?

I am trying to send a List[KeyValue] to the topic, in a Kafka Streams app. But the stream expects a single KeyValue. How I can send the KeyValues to the stream, instead of the whole list?

class MainTopology {

  val createTopology = (sourceTopic: String, sinkTopic: String) => {
    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .map[String, String]((key, value) => toFormattedEvents(key, value))
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event))
  }

}

The second map is not compiling due to this.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

I update my code, but now it is throwing another error:

    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .flatMap(
      (key, value) => toFormattedEvents(key, value)
    )
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event)).toIterable.asJava
  }
Error:(15, 8) inferred type arguments [?1,?0] do not conform to method flatMap's type parameter bounds [KR,VR]
      .flatMap(
Error:(16, 20) type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[String,Option[org.minsait.streams.model.JsonMessage],Iterable[org.apache.kafka.streams.KeyValue[String,String]]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: Option[org.minsait.streams.model.JsonMessage], _ <: Iterable[_ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]]
      (key, value) => toFormattedEvents(key, value)

Upvotes: 2

Views: 1542

Answers (1)

miguno
miguno

Reputation: 15067

Take a look at flatMap() and flatMapValues() to replace the second map(). https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html

For example:

.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))

If you do intent to keep using map for some reason, then see below.

The second map is not compiling due to this.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

The corresponding code, for reference:

.map[String, String]((key, value) => toFormattedEvents(key, value))

What you need is something like:

.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))

where KR and VR are the key and value types, respectively, as returned by toFormattedEvents() (the actual return types are not clear from your question). For this to work you must also have Serde for the List[KeyValue[KR, VR]] type.

Let me illustrate this with a few different types so it's easier to understand which part of the method call refers to which type. Assume we want to the map output to have a key type of Integer and a value type of List[KeyValue[String, Long]]:

.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))

Note that this example assigns the same values to every mapped record, but it's not the point of the example.

Upvotes: 2

Related Questions