Kenneth Muhia
Kenneth Muhia

Reputation: 43

No pending reply: ConsumerRecord

I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.

No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic

It would stem from the code below

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

But happens only intemittently

I have also set the shared replyTopic to false as below and attempted to force a longer timeout

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

My Container is as below

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

Upvotes: 4

Views: 5507

Answers (2)

Tiago Medici
Tiago Medici

Reputation: 2194

Issue fixed using on the consumer the attribute @Header (KafkaHeaders.CORRELATION_ID)

@KafkaListener(topics = "${kafka.topic.model}")
@SendTo("replymodeltopic")
@Override
public Model receive(ConsumerRecord<String, model> record, @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) { 
    record.headers().add(KafkaHeaders.CORRELATION_ID, correlation);
    return record.value();
}

on my configuration i have

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.tunnel.group}")
    private String tunnelGroup;

    @Value("${kafka.topic.json.reply}")
    private String jsonTopicReply;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, tunnelGroup);

        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, Model> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Model.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Model> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(replyTemplate());
        return factory;
    }

    @Bean
    public ProducerFactory<String, Model> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Model> replyTemplate() {
        KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setDefaultTopic(jsonTopicReply);
        return kafkaTemplate;
    }

}

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174729

If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear

perhaps timed out, or using a shared reply topic

Each client side instance must use it's own reply topic or dedicated partition.

EDIT

You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:

  1. The request timed out (in which case there will be a corresponding WARN log).
  2. The template is stop()ped (in which case this.futures is cleared).
  3. An already processed reply is redelivered for some reason (shouldn't happen).
  4. The reply is received before the key is added to this.futures (can't happen since it's inserted before send()ing the record).
  5. The server side sends 2 or more replies for the same request.
  6. Some other application is sending data to the same reply topic. If you can reproduce it with DEBUG logging, it would help because then we log the correlation key on the send as well.

Upvotes: 5

Related Questions