Pradeep
Pradeep

Reputation: 1975

ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate

I am using kafka to publish both async and sync messages to the broker .One listener would listen to the topic and respond for both sync and async calls. I am using same request topic for both the templates .. When using fire and forget(Async) I don't see any issues since listener would listen to the messages randomly from topic.When using synchronous call I am getting timeout exception.

  1. Do I need to maintain multiple listeners for different templates ?
  2. With same topic for both synchronous and async operations would there be any issues?

KafkaConfig.java

//Template for synchornous call

@Bean 
public ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentMessageListenerContainer<String, Model> repliesContainer)
{
    ReplyingKafkaTemplate<String, Model, Model> replyTemplate =
        new ReplyingKafkaTemplate<>(pf, repliesContainer);
    replyTemplate.setSharedReplyTopic(true);
    return replyTemplate;
}

@Bean //register ConcurrentMessageListenerContainer bean
public ConcurrentMessageListenerContainer<String, Model> repliesContainer (
    ConcurrentKafkaListenerContainerFactory<String, Model> containerFactory)
{
    ConcurrentMessageListenerContainer<String, Model> repliesContainer =
        containerFactory.createContainer("responseTopic");
    repliesContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
    repliesContainer.setAutoStartup(false);
    return repliesContainer;
}

//Template for asynchronous call

@Bean

@Qualifier("kafkaTemplate")
public KafkaTemplate<String, Model> kafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentKafkaListenerContainerFactory<String, Model> factory)
{
    KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(pf);
    factory.setReplyTemplate(kafkaTemplate);
    return kafkaTemplate;
}

Here is service class

@Service
public class KafkaService
{
    @Autowired
    private ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate;
    @Autowired
    private KafkaTemplate<String, Model> kafkaTemplate;
    @Autowired
    private KafkaConfig config;
    public Object sendAndReceive (Model model)
    {

        ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
        producerRecord.headers()
            .add(
                new RecordHeader(KafkaHeaders.REPLY_TOPIC, "replyTopic"));
        RequestReplyFuture<String, Model, Model> replyFuture =
            replyingKafkaTemplate.sendAndReceive(producerRecord, Duration.ofSeconds(timeout));
        ConsumerRecord<String, Model> consumerRecord =
            replyFuture.get(timeout, TimeUnit.SECONDS);
        return consumerRecord.value();

    }
    public ResponseEntity<Object> send (final Model model)
    {

        final ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
        final ListenableFuture<SendResult<String, Model>> future =
            kafkaTemplate.send(producerRecord);
        final SendResult<String, Model> sendResult = future.get(timeout, TimeUnit.SECONDS);
        return new ResponseEntity<>(sendResult, HttpStatus.ACCEPTED);
    }

}

Here is the listener class.

@Slf4j
@Service
public class MessageListener
{
    @KafkaListener(groupId = "${group.id}", topics = "requestTopic", errorHandler = "customKafkaListenerErrorHandler",containerFactory = "customKafkaListenerContainerFactory")
    @SendTo
    public Model consumer (Model model)
    {
        switch (model.getType()) {
        case "async":
            System.out.println("Async messages are retrieved");
        case "sync":
            System.out.println("Sync messages are retrieved");
            return model;
        }
        return model;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> customKafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
    {
        ConcurrentKafkaListenerContainerFactory<Object, Object>
        concurrentKafkaListenerContainerFactory =
        new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.
        setConsumerFactory(kafkaConsumerFactory);
       concurrentKafkaListenerContainerFactory.getContainerProperties()
       .setAckMode(ContainerProperties.AckMode.RECORD);
       concurrentKafkaListenerContainerFactory.
       setCommonErrorHandler(errorHandler());
       configurer.configure(concurrentKafkaListenerContainerFactory, kafkaConsumerFactory);
    concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate);
    return concurrentKafkaListenerContainerFactory;
    }
}

application.properties

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.auto-offset-reset=earliest

Debug Logs:

2022-09-15 15:48:07.771 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=com.sample.Model@37a32ae0, headers={kafka_offset=239, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59ff0b21, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=requestTopic, kafka_receivedTimestamp=1663282080306, kafka_groupId=consumer_group_new22}]]
2022-09-15 15:48:07.774 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [com.sample.Model@37a32ae0] - generating response message for it
2022-09-15 15:48:07.780 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : No replyTopic to handle the reply: com.sample.Model@37a32ae0
2022-09-15 15:50:54.760 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=com.sample.Model@3f766126, headers={kafka_offset=240, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59ff0b21, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=requestTopic, kafka_receivedTimestamp=1663282254296, kafka_groupId=consumer_group_new22}]]
2022-09-15 15:50:54.760 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [com.sample.Model@3f766126] - generating response message for it
2022-09-15 15:50:54.761 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : No replyTopic to handle the reply: com.sample.Model@3f766126
2022-09-15 15:51:44.482 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=com.sample.Model@56c68983, headers={kafka_offset=241, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59ff0b21, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=requestTopic, kafka_receivedTimestamp=1663282304204, kafka_groupId=consumer_group_new22}]]
2022-09-15 15:51:44.483 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [com.sample.Model@56c68983] - generating response message for it
2022-09-15 15:51:44.483 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : No replyTopic to handle the reply: com.sample.Model@56c68983
2022-09-15 15:52:03.237 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=com.sample.Model@6682bf3c, headers={kafka_offset=242, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59ff0b21, kafka_correlationId=[B@65f4dd3b, kafka_timestampType=CREATE_TIME, kafka_replyTopic=[B@79cca97, kafka_receivedPartitionId=0, kafka_receivedTopic=requestTopic, kafka_receivedTimestamp=1663282322947, kafka_groupId=consumer_group_new22}]]
2022-09-15 15:52:03.237 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [com.sample.Model@6682bf3c] - generating response message for it
2022-09-15 15:52:42.585 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=com.sample.Model@78a4382d, headers={kafka_offset=243, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59ff0b21, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=requestTopic, kafka_receivedTimestamp=1663282362320, kafka_groupId=consumer_group_new22}]]
2022-09-15 15:52:42.585 DEBUG 35380 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [com.sample.Model@78a4382d] - generating response message for it

Upvotes: 0

Views: 862

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

This works exactly as I expected...

@SpringBootApplication
public class So73657031Application {

    public static void main(String[] args) {
        SpringApplication.run(So73657031Application.class, args);
    }

    @Bean
    ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            KafkaTemplate<String, String> template) {

        factory.setReplyTemplate(template);
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so73657031-replies");
        container.getContainerProperties().setGroupId("so73657031-replies");
        return new ReplyingKafkaTemplate<>(pf, container);
    }

    @Bean
    KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    NewTopic topic1() {
        return TopicBuilder.name("so73657031").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic topic2() {
        return TopicBuilder.name("so73657031-replies").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rTemplate,
            KafkaTemplate<String, String> template) {

        return args -> {
            RequestReplyFuture<String, String, String> future =
                    rTemplate.sendAndReceive(new ProducerRecord<String, String>("so73657031", 0, null, "test"),
                            Duration.ofSeconds(30));
            System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
            System.out.println(future.get(30, TimeUnit.SECONDS).value());
            ListenableFuture<SendResult<String, String>> future2 = template.send("so73657031", "oneWay");
            System.out.println(future2.get(10, TimeUnit.SECONDS).getRecordMetadata());
        };
    }

}

@Component
class Listener {


    @KafkaListener(id = "so73657031", topics = "so73657031")
    @SendTo
    String listen(String in) {
        System.out.println(in);
        return in.toUpperCase();
    }

}
logging.level.root=warn
logging.level.org.springframework.kafka.listener.adapter=debug
so73657031-0@2
2022-09-15 15:36:34.496 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=test, headers={kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1582e8e4, kafka_correlationId=[B@2a266829, kafka_timestampType=CREATE_TIME, kafka_deliveryAttempt=1, kafka_replyTopic=[B@3dad3e81, kafka_receivedPartitionId=0, kafka_receivedTopic=so73657031, kafka_receivedTimestamp=1663270594381, kafka_groupId=so73657031}]]
test
2022-09-15 15:36:34.499 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [TEST] - generating response message for it
TEST
so73657031-0@3
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=oneWay, headers={kafka_offset=3, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1582e8e4, kafka_timestampType=CREATE_TIME, kafka_deliveryAttempt=1, kafka_receivedPartitionId=0, kafka_receivedTopic=so73657031, kafka_receivedTimestamp=1663270594514, kafka_groupId=so73657031}]]
oneWay
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [ONEWAY] - generating response message for it
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : No replyTopic to handle the reply: ONEWAY

Upvotes: 1

Related Questions