Reputation: 377
I am new to Apache Kafka. I tried to find out the solution for this issue but I failed. The producer code is working fine and data is getting stored in the Kafka topic. I am using JsonSerializer as the ValueSerializer in my Kafka Producer configurations.
NoramlizedEvent is a simple POJO that is being used in Producer as well as Consumer.
My Producer Code:
public void saveMessage(final IMMessage message) {
for (NormalizedEvent event :
message.getNormalizedEvents()) {
event.setServiceId(message.getServiceId());
ProducerRecord<String, NormalizedEvent> producerRecord = buildProducerRecord(null, event, TOPIC);
ListenableFuture<SendResult<String, NormalizedEvent>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, NormalizedEvent>>() {
@Override
public void onFailure(Throwable ex) {
handleFailure(null, event, ex);
}
@Override
public void onSuccess(SendResult<String, NormalizedEvent> result) {
properties.put("partitionId", result.getRecordMetadata().partition());
properties.put("offsetId", result.getRecordMetadata().offset());
handleSuccess(null, event, result);
imMessageDBService.setDBProperties(event, properties);
}
});
}
}
private ProducerRecord<String, NormalizedEvent> buildProducerRecord(String key, NormalizedEvent value, String topic) {
return new ProducerRecord<>(topic, key, value);
}
My Consumer Code:
@Bean
public ConsumerFactory<String, NormalizedEvent> userConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(NormalizedEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> userKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
Error message:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.0.jar:2.7.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition SAP-S_4-HANA-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.0.jar:2.7.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Upvotes: 1
Views: 2612
Reputation: 174484
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all ().
It looks like the event is in different packages in the producer and consumer.
By default, the deserializer uses type info in headers, the generic argument is used as a fallback if there are no type headers.
There are two solutions, on the deserializer:
/**
* Set to false to ignore type information in headers and use the configured
* target type instead.
* Only applies if the preconfigured type mapper is used.
* Default true.
* @param useTypeHeaders false to ignore type headers.
* @since 2.2.8
*/
public void setUseTypeHeaders(boolean useTypeHeaders) {
if (!this.typeMapperExplicitlySet) {
this.useTypeHeaders = useTypeHeaders;
setUpTypePrecedence(Collections.emptyMap());
}
}
or on the serializer:
/**
* Set to false to disable adding type info headers.
* @param addTypeInfo true to add headers.
* @since 2.1
*/
public void setAddTypeInfo(boolean addTypeInfo) {
this.addTypeInfo = addTypeInfo;
}
See the documentation and Javadocs.
Upvotes: 1