Reputation: 13
So, I've got this class with a kafka listener on the class level. It has two handler methods and bothare supposed to send back a reply on a reply topic that is sent with in the headers of the message. We use AVRO schemas for the messages, with specific reading enabled. It looks like this:
@Component
@Slf4j
@KafkaListener(
id = "kvk_listener",
topics = {"${kafka.topic.request.search}", "${kafka.topic.request.validation}"})
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaHandler
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaHandler
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
Now, I am using a request-reply connector in another service, but the response never arrives. pretty much all configuration is through spring boot auto-configuration. My properties look like this:
spring.kafka.bootstrap-servers=localhost:29095
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=odp-company-kvk-service
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.properties.specific.avro.reader=true
So here's the thing. Neither of the handlers will send their reply. They can receive fine, I can see it in the logs or debugger, but they don't reply. The weirdest part is that if I would switch the listener set-up into 2 separate listeners, the reply does get sent. Like so:
@Component
@Slf4j
public class KvkListener {
private final CompanySearchService companySearchService;
private final CompanyDetailsService companyDetailsService;
@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
this.companySearchService = companySearchService;
this.companyDetailsService = companyDetailsService;
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.search}")
public SearchResults listenForSearchRequests(@Payload Search search) {
log.info("received thingie");
return companySearchService.search(search.getCompanyName())
.toSearchDetails();
}
@SendTo
@KafkaListener(
topics = "${kafka.topic.request.validation}")
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
log.info("received other thingie");
return companyDetailsService.details(lookup.getCompanyId())
.toCompanyDetails(lookup.getProductType());
}
}
I was under the impression that the @KafkaHandler set-up would be pretty much identical. The difference in behavior is that if you'd somehow end up with a message that doesn't have the required schema, the handler one allows you to set a default listener for 'Object' that can catch it. This is what I'd like to see, so I'm hoping I can get it to work.
I have tried everything from completely overriding all the consumer/producer beans and explicitly setting reply templates. I am at my wits end as to why this will not do what I expect it to. Please StackOverflow, help me with your problem solving magic!
Upvotes: 0
Views: 1126
Reputation: 174494
It's a bug, I opened an issue.
I found a better work around; note the !{..}
not #{...}
.
@Component
@KafkaListener(id = "so62569951", topics = "so62569951")
class Foo {
@KafkaHandler
@SendTo("!{source.headers.kafka_replyTopic}")
public String upcase(String in) {
System.out.println(in);
return in.toUpperCase();
}
}
Upvotes: 1