Reputation: 143
I have a use case where I want to use the Consumer object, therefore, I am using this
@Slf4j
@Service
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MyMessageListener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())));
}
}
But every time I send a message to the topic, I get an IllegalArgumentException, I think this is happening because of the Consumer object.
This is my listener container -
@Autowired
private MyMessageListener myMessageListener;
public KafkaMessageListenerContainer<String, String> createContainer(String topic) {
ContainerProperties containerProperties = new ContainerProperties(topic);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.getContainerProperties().setMessageListener(myMessageListener);
listenerContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
listenerContainer.setAutoStartup(false);
return listenerContainer;
}
private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.constants.getKafkaBootstrapAddress());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
Why is this happening?
EDIT 1
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is java.lang.IllegalArgumentException: Unrecognized Type: [null]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2361) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2346) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2220) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2134) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2016) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.0.jar:2.7.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Upvotes: 1
Views: 425
Reputation: 174729
What version are you using? Can you provide a small app that reproduces it? I just tested it, first using Boot's auto-configured container factory, and then with your code, and both work fine for me (latest Boot 2.4.5).
@SpringBootApplication
public class So67301285Application {
public static void main(String[] args) {
SpringApplication.run(So67301285Application.class, args);
}
@Bean
ConcurrentMessageListenerContainer<String, String> container(
ConcurrentKafkaListenerContainerFactory<String, String> factory,
ConsumerAwareMessageListener<String, String> listener) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so67301285");
container.getContainerProperties().setMessageListener(listener);
container.getContainerProperties().setGroupId("so67301285");
return container;
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67301285").partitions(1).replicas(1).build();
}
@Bean
public KafkaMessageListenerContainer<String, String> createContainer(Listener listener) {
ContainerProperties containerProperties = new ContainerProperties("so67301285");
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory, containerProperties);
listenerContainer.getContainerProperties().setMessageListener(listener);
listenerContainer.getContainerProperties().setGroupId("so67301285-1");
listenerContainer.setAutoStartup(true);
return listenerContainer;
}
private Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
@Component
class Listener extends AbstractConsumerSeekAware
implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
System.out.println(data.value() + "@" + data.offset() + "from group: " + KafkaUtils.getConsumerGroupId());
}
}
Result:
foo@10from group: so67301285
foo@10from group: so67301285-1
Upvotes: 2