Trayambak Kumar
Trayambak Kumar

Reputation: 143

ConsumerAwareMessageListener throwing IllegalArgumentException

I have a use case where I want to use the Consumer object, therefore, I am using this

@Slf4j
@Service
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MyMessageListener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
        
        consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())));
    }

}

But every time I send a message to the topic, I get an IllegalArgumentException, I think this is happening because of the Consumer object.

This is my listener container -

@Autowired
private MyMessageListener myMessageListener;

public KafkaMessageListenerContainer<String, String> createContainer(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        listenerContainer.getContainerProperties().setMessageListener(myMessageListener);
        listenerContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
        listenerContainer.setAutoStartup(false);
        return listenerContainer;
    }



 private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.constants.getKafkaBootstrapAddress());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return props;
    }

Why is this happening?

EDIT 1

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is java.lang.IllegalArgumentException: Unrecognized Type: [null]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2361) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2346) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2220) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2134) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2016) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.0.jar:2.7.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

Upvotes: 1

Views: 425

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

What version are you using? Can you provide a small app that reproduces it? I just tested it, first using Boot's auto-configured container factory, and then with your code, and both work fine for me (latest Boot 2.4.5).

@SpringBootApplication
public class So67301285Application {

    public static void main(String[] args) {
        SpringApplication.run(So67301285Application.class, args);
    }

    @Bean
    ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            ConsumerAwareMessageListener<String, String> listener) {

        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so67301285");
        container.getContainerProperties().setMessageListener(listener);
        container.getContainerProperties().setGroupId("so67301285");
        return container;
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67301285").partitions(1).replicas(1).build();
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> createContainer(Listener listener) {
        ContainerProperties containerProperties = new ContainerProperties("so67301285");
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(
                consumerFactory, containerProperties);
        listenerContainer.getContainerProperties().setMessageListener(listener);
        listenerContainer.getContainerProperties().setGroupId("so67301285-1");
        listenerContainer.setAutoStartup(true);
        return listenerContainer;
    }

    private Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

}

@Component
class Listener extends AbstractConsumerSeekAware
        implements ConsumerAwareMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        System.out.println(data.value() + "@" + data.offset() + "from group: " + KafkaUtils.getConsumerGroupId());
    }

}

Result:

foo@10from group: so67301285
foo@10from group: so67301285-1

Upvotes: 2

Related Questions