Namigamee
Namigamee

Reputation: 13

@KafkaHandler receives fine, but doesn't send reply with @SendTo. What's going wrong?

So, I've got this class with a kafka listener on the class level. It has two handler methods and bothare supposed to send back a reply on a reply topic that is sent with in the headers of the message. We use AVRO schemas for the messages, with specific reading enabled. It looks like this:

@Component
@Slf4j
@KafkaListener(
        id = "kvk_listener",
        topics = {"${kafka.topic.request.search}", "${kafka.topic.request.validation}"})
public class KvkListener {
    private final CompanySearchService companySearchService;
    private final CompanyDetailsService companyDetailsService;

@Autowired
public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
    this.companySearchService = companySearchService;
    this.companyDetailsService = companyDetailsService;
}

@SendTo
@KafkaHandler
public SearchResults listenForSearchRequests(@Payload Search search) {
    log.info("received thingie");
    return companySearchService.search(search.getCompanyName())
            .toSearchDetails();
}

@SendTo
@KafkaHandler
public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
    log.info("received other thingie");
    return companyDetailsService.details(lookup.getCompanyId())
            .toCompanyDetails(lookup.getProductType());
}

Now, I am using a request-reply connector in another service, but the response never arrives. pretty much all configuration is through spring boot auto-configuration. My properties look like this:

spring.kafka.bootstrap-servers=localhost:29095
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=odp-company-kvk-service

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

spring.kafka.properties.specific.avro.reader=true

So here's the thing. Neither of the handlers will send their reply. They can receive fine, I can see it in the logs or debugger, but they don't reply. The weirdest part is that if I would switch the listener set-up into 2 separate listeners, the reply does get sent. Like so:

@Component
@Slf4j
public class KvkListener {
    private final CompanySearchService companySearchService;
    private final CompanyDetailsService companyDetailsService;

    @Autowired
    public KvkListener(CompanySearchService companySearchService, CompanyDetailsService companyDetailsService) {
        this.companySearchService = companySearchService;
        this.companyDetailsService = companyDetailsService;
    }

    @SendTo
    @KafkaListener(
            topics = "${kafka.topic.request.search}")
    public SearchResults listenForSearchRequests(@Payload Search search) {
        log.info("received thingie");
        return companySearchService.search(search.getCompanyName())
                .toSearchDetails();
    }

    @SendTo
    @KafkaListener(
            topics = "${kafka.topic.request.validation}")
    public CompanyDetails listenForValidationRequests(@Payload Lookup lookup) {
        log.info("received other thingie");
        return companyDetailsService.details(lookup.getCompanyId())
                .toCompanyDetails(lookup.getProductType());
    }
}

I was under the impression that the @KafkaHandler set-up would be pretty much identical. The difference in behavior is that if you'd somehow end up with a message that doesn't have the required schema, the handler one allows you to set a default listener for 'Object' that can catch it. This is what I'd like to see, so I'm hoping I can get it to work.

I have tried everything from completely overriding all the consumer/producer beans and explicitly setting reply templates. I am at my wits end as to why this will not do what I expect it to. Please StackOverflow, help me with your problem solving magic!

Upvotes: 0

Views: 1126

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

It's a bug, I opened an issue.

I found a better work around; note the !{..} not #{...}.

@Component
@KafkaListener(id = "so62569951", topics = "so62569951")
class Foo {

    @KafkaHandler
    @SendTo("!{source.headers.kafka_replyTopic}")
    public String upcase(String in) {
        System.out.println(in);
        return in.toUpperCase();
    }

}

Upvotes: 1

Related Questions