Reputation: 2305
Edit: root cause was referring to a property path which did not exist. The Consumer.java class now has this and it is working:
@ConditionalOnProperty(value = "aaronshaver.kafka.consumer-enabled", havingValue = "true")
I have a simple Kafka setup with a producer that is definitely producing. I confirmed this with this code:
SendMessageTask.java
ListenableFuture<SendResult<String, String>> listenableFuture = this.producer.sendMessage("INPUT_DATA", "IN_KEY", LocalDate.now().toString());
SendResult<String, String> result = listenableFuture.get();
logger.info(String.format("\nProduced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d\n", result.getRecordMetadata().topic(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition(), result.getRecordMetadata().serializedValueSize()));
Messages show up in the console with the contents of the send result.
What I can't figure out is how the consumer's Kafka listener method gets called / why it's not getting called. Here's the consumer (I tried a couple different methods):
Consumer.java
@Service
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "INPUT_DATA", groupId = "fooGroup")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group fooGroup: " + message);
}
@KafkaListener(topics = {"INPUT_DATA"})
public void consume(
final @Payload String message,
final @Header(KafkaHeaders.OFFSET) Integer offset,
final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
final Acknowledgment acknowledgment
) {
logger.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s", ts, message, offset, key, partition, topic));
acknowledgment.acknowledge();
}
I tried this config file:
*KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"fooGroup");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
I feel like the @KafkaListener annotated method(s) should be called but I may be misunderstanding how it works, or have something misconfigured.
For good measure, here's the file that has some Spring and Kafka properties set up:
main/resources/application.yml
aaronshaver:
kafka:
consumer-enabled: ${consumer-enabled:true}
spring:
kafka:
bootstrap-servers: ${kafka_bootstrap_servers:localhost:29092}
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username=${kafka_username:'admin'} password=${kafka_password:'admin-secret'};
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
consumer:
auto-offset-reset: earliest
group-id: aaronshaver
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
fetch-max-wait: 36000
enable-auto-commit: false
client-id: aaronshaver
producer:
client-id: aaronshaver
key-serializer: org.apache.kafka.common.serializatoin.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 2
jaas:
enabled: true
listener:
poll-timeout: 1800000
concurrency: 1
ack-mode: manual_immediate
Any ideas? Thanks!
Upvotes: 0
Views: 356
Reputation: 121212
Your problem is here:
@ConditionalOnProperty(value = "example.kafka.consumer-enabled", havingValue = "true")
And there is just no such a configuration property.
When I remove this condition, it started to consume records from the topic.
Please, revise the configuration or logic in your application.
Upvotes: 1