Reputation: 5878
I am following a simple guide documentation to use Kafka streams with spring boot (Spring guide)
It is clear for me how come in and out the messages and then in the middle I am able to do some processing, replacing the @KafkaListener
and kafkaTemplate.send()
So I made a super simple basic class like this:
@EnableBinding(Processor.class)
public static class UppercaseTransformer {
@StreamListener
@Input(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}
and then (and maybe this is my error), from a controller I do this:
template.send("my-topic","hello world");
I am using spring cloud streams with a configuration like this:
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: ${spring.application.name}
consumer:
concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
output:
destination: my-topic
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
required-acks: all
transaction:
transaction-id-prefix: ${spring.application.name}-
producer:
configuration:
retries: 3
bindings:
input:
consumer:
configuration:
isolation.level: read_committed
enable-dlq: true
dlq-name: some-name
Also tried this with the consumer and listener
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
and everytime I try to send a message I get this:
class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
Have no idea what is wrong and why so much change from the normal listener to this version... ideas?
Upvotes: 0
Views: 323
Reputation: 5924
I just created an application from start.spring.io and selected "Cloud Stream" and "Kafka". Generated the project and added this in the main class (Used the same configuration you provided above).
@SpringBootApplication
@EnableBinding(Processor.class)
public class So54408906Application {
public static void main(String[] args) {
SpringApplication.run(So54408906Application.class, args);
}
@StreamListener(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}
Then ran the kafka console producer script.
kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
The text provided at the script was getting logged on the application's console.
Upvotes: 1