Reputation: 153
I have a Kafka consumer and it works fine. but when I add a DeadLetterPublishingRecoverer to handle the error message it throws an deprecated warning.
I need a clean code Can some one advice on below issue
This is the warning message that is thrown in the eclipse
The constructor DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object,? extends Object>, BiFunction<ConsumerRecord<?,?>,Exception,TopicPartition>) is deprecated
@Slf4j
@EnableKafka
@Configuration
public class AssignmentOperationKafkaConsumerConfig {
@Value("${spring.kafka.offset-reset-policy}")
private String offsetResetPolicy;
@Value("${spring.kafka.group-id}")
private String groupId;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.max-poll-interval}")
private Integer maxPollInterval;
@Value("${spring.kafka.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.session-timeout}")
private Integer sessionTimeout;
@Value("${spring.kafka.trusted-packages}")
private String trustedPacakges;
public AssignmentOperationKafkaConsumerConfig() {
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetPolicy);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPacakges);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);
return props;
}
@Bean
public ConsumerFactory<String, Assignment> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Assignment.class, false)));
}
@Autowired
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Assignment> kafkaListenerContainerFactory(
KafkaTemplate<String, Assignment> KafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Assignment> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setAckDiscarded(true);
factory.getContainerProperties().setAuthorizationExceptionRetryInterval(Duration.ofMillis(20000));
factory.setRecordFilterStrategy(recordFilterStrategy);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(KafkaTemplate,
(r, e) ->
new TopicPartition(r.topic() + "-DLQ", -1)
);
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(recoverer,
new FixedBackOff(0L, 3L));
seekToCurrentErrorHandler.setCommitRecovered(true);
factory.setErrorHandler(seekToCurrentErrorHandler);
log.trace("after seekToCurrentErrorHandler filter ");
return factory;
}
}
Am i doing anything wrong? is there a way to clear the warning message or can we ignore that Can someone advice please ?
Upvotes: 0
Views: 2282
Reputation: 1
I changed the method signature to below and it is working
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Assignment> kafkaListenerContainerFactory(KafkaOperations<String, Assignment> KafkaTemplate)
Upvotes: 0
Reputation: 121550
Just follow its recommendations!
* @deprecated in favor of {@link #DeadLetterPublishingRecoverer(KafkaOperations, BiFunction)}.
In your case you need explicitly cast that KafkaTemplate
variable to KafkaOperations<String, Assignment>
.
Or just consider to upgrade to the latest Spring Boot, which would bring you the latest Spring for Apache Kafka with already removed that deprecated ctor!
Upvotes: 1