Reputation: 307
String prefix = "B";
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-word-pattern");
StreamsBuilder streamsBuilder = new StreamsBuilder();
//source
KStream<String, String> stream = streamsBuilder.stream(SOURCE_TEST_TOPIC);
//word processor
KStream<String, String> wordProcessor = stream.flatMapValues(s -> Arrays.asList(s.split(",")));
//match
KStream<String, String> matchProcessor =
wordProcessor.filter((key, value) -> value.toUpperCase().startsWith(prefix));
matchProcessor.to(WORD_TOPIC);
Topology topology = streamsBuilder.build();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
System.err.println("Stream is starting...");
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("Stream is closing...");
kafkaStreams.close();
}));
}
when I run this stream, this exception raised:
Exception in thread "main" java.lang.NoSuchFieldError: TRACE at org.apache.kafka.streams.StreamsConfig.(StreamsConfig.java:766) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:693) at kafkacustom.streams.KafkaStreamsExample.main(KafkaStreamsExample.java:42)
How can I fix it
Upvotes: 2
Views: 2184
Reputation: 362
This is caused because of libraries incompatibilites, check your org.apache.kafka:kafka-clients
and your org.apache.kafka:kafka-streams
versions.
In my specific case org.apache.kafka:kafka-clients:6.0.0-ccs
(confluent library) has removed this TRACE
enum value from RecordingLevel
Sensor inner class and I was using org.apache.kafka:kafka-streams:3.1.1
from open source version. So i just updated to the confluent version 6.0.0-css
and works
For further investigation on your side, click on the console log trace for the class StreamsConfig
, should be line 794 and check the Sensor
class if its inner class RecordingLevel
has the TRACE
enum value
For the "official" repo, it still reamins this enum value
Upvotes: 2