Reputation: 43
I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.
No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic
It would stem from the code below
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
But happens only intemittently
I have also set the shared replyTopic to false as below and attempted to force a longer timeout
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;
My Container is as below
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
Upvotes: 4
Views: 5507
Reputation: 2194
Issue fixed using on the consumer the attribute @Header (KafkaHeaders.CORRELATION_ID)
@KafkaListener(topics = "${kafka.topic.model}")
@SendTo("replymodeltopic")
@Override
public Model receive(ConsumerRecord<String, model> record, @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
record.headers().add(KafkaHeaders.CORRELATION_ID, correlation);
return record.value();
}
on my configuration i have
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.tunnel.group}")
private String tunnelGroup;
@Value("${kafka.topic.json.reply}")
private String jsonTopicReply;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, tunnelGroup);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ConsumerFactory<String, Model> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Model> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory<String, Model> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Model> replyTemplate() {
KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(jsonTopicReply);
return kafkaTemplate;
}
}
Upvotes: 1
Reputation: 174729
If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear
perhaps timed out, or using a shared reply topic
Each client side instance must use it's own reply topic or dedicated partition.
EDIT
You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:
Upvotes: 5