david
david

Reputation: 153

Why DeadLetterPublishingRecoverer is throwing deprecated warning in springboot?

I have a Kafka consumer and it works fine. but when I add a DeadLetterPublishingRecoverer to handle the error message it throws an deprecated warning.

I need a clean code Can some one advice on below issue

This is the warning message that is thrown in the eclipse

The constructor DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object,? extends Object>, BiFunction<ConsumerRecord<?,?>,Exception,TopicPartition>) is deprecated
@Slf4j
@EnableKafka
@Configuration
public class AssignmentOperationKafkaConsumerConfig {
    @Value("${spring.kafka.offset-reset-policy}")
    private String offsetResetPolicy;
    @Value("${spring.kafka.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${spring.kafka.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${spring.kafka.session-timeout}")
    private Integer sessionTimeout;
    @Value("${spring.kafka.trusted-packages}")
    private String trustedPacakges;

    public AssignmentOperationKafkaConsumerConfig() {
    }

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetPolicy);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPacakges);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);

        return props;
    }

    @Bean
    public ConsumerFactory<String, Assignment> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Assignment.class, false)));
    }

    @Autowired
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Assignment> kafkaListenerContainerFactory(
            KafkaTemplate<String, Assignment> KafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, Assignment> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAuthorizationExceptionRetryInterval(Duration.ofMillis(20000));
        factory.setRecordFilterStrategy(recordFilterStrategy);

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(KafkaTemplate,
                (r, e) ->
                        new TopicPartition(r.topic() + "-DLQ", -1)
        );
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(recoverer,
                new FixedBackOff(0L, 3L));
        seekToCurrentErrorHandler.setCommitRecovered(true);

        factory.setErrorHandler(seekToCurrentErrorHandler);
        log.trace("after seekToCurrentErrorHandler filter ");

        return factory;
    }

}

Am i doing anything wrong? is there a way to clear the warning message or can we ignore that Can someone advice please ?

Upvotes: 0

Views: 2282

Answers (2)

Eswar Janjanam
Eswar Janjanam

Reputation: 1

I changed the method signature to below and it is working

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Assignment> kafkaListenerContainerFactory(KafkaOperations<String, Assignment> KafkaTemplate)

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121550

Just follow its recommendations!

* @deprecated in favor of {@link #DeadLetterPublishingRecoverer(KafkaOperations, BiFunction)}.

In your case you need explicitly cast that KafkaTemplate variable to KafkaOperations<String, Assignment>.

Or just consider to upgrade to the latest Spring Boot, which would bring you the latest Spring for Apache Kafka with already removed that deprecated ctor!

Upvotes: 1

Related Questions