Ángel Igualada
Ángel Igualada

Reputation: 891

Scala - How to filter KStream (Kafka Streams)

I'm new on Scala and I'm trying to filter a KStream[String, JsonNode] based on the second component fields.

As a example, the working Java code is this:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
...
import com.fasterxml.jackson.databind.JsonNode;
...
...
final KStream<String, JsonNode> source = streamsBuilder.stream(inputTopic,
                    Consumed.with(Serdes.String(), jsonSerde));


// filter and producer preprocessed
source.filter((k, v) -> v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
    .to(outputTopic, Produced.with(Serdes.String(), jsonSerde));

I have tried this:

import org.apache.kafka.streams.kstream.{Produced,Consumed,KStream};
import org.apache.kafka.streams.StreamsBuilder;
...
import com.fasterxml.jackson.databind.JsonNode;
...
var source:KStream[String, JsonNode] = streamsBuilder.stream(inputTopic, Consumed.`with`(Serdes.String(), jsonSerde));

source.filter({
  case (k:String ,v:JsonNode) => 
    (v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
  })
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

In the above try i got:

Description Resource Path Location Type missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: org.apache.kafka.streams.kstream.Predicate[? >: String, ? >: com.fasterxml.jackson.databind.JsonNode]

I also tried this:

source.filter((_._2.get("total_cost").asDouble() > 0 && _._2.get("num_items").asInt() > 0))
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

How could i filter this object in Scala? Thanks in advance.

Upvotes: 1

Views: 1123

Answers (1)

bottaio
bottaio

Reputation: 5093

org.apache.kafka.streams.kstream.Predicate comes from JavaAPI, in Scala 2.11 (I assume you use it) you have to implement interface explicitly, thus:

source.filter(new Predicate[String, JsonNode]() {
  override def test(k: String, v: JsonNode): Boolean = {
    v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0
  }
}).to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

should work.

More on SAMs (Single Abstract Methods) can be found here

Note that you don't have to use Java API - there is first-class Scala API.

Upvotes: 3

Related Questions