Reputation: 2012
To show what I mean I make simple project.
Dependencies:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
This is all kafka configuration in code:
@Configuration
public class KafkaSerializationConfig implements SmartInitializingSingleton {
private final KafkaProperties kafkaProperties;
public KafkaSerializationConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Override
public void afterSingletonsInstantiated() {
AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties());
List<NewTopic> newTopics = new ArrayList<>();
newTopics.add(new NewTopic("demo", 2, (short) 2));
client.createTopics(newTopics);
client.close();
}
private static ObjectMapper mapper = new ObjectMapper()
.registerModules(new Jdk8Module(), new JavaTimeModule());
public static class KafkaSerializer extends JsonSerializer<Object> {
public KafkaSerializer() {
super(mapper);
}
}
public static class KafkaDeserializer extends JsonDeserializer<Object> {
public KafkaDeserializer() {
super(mapper);
}
}
}
application.yml
file:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: sample-cutter
auto-offset-reset: earliest
properties.spring.json.trusted.packages: "*"
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaSerializer
So this app can be up and its works. But let's try to use it. Let's image we have two services: the server and the client. So we have two classes with the same fields:
DTO for the client:
public class GettingUser {
private String firstName;
private String lastName;
public GettingUser() {
}
public GettingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
DTO for the server:
public class SendingUser {
private String firstName;
private String lastName;
public SendingUser() {
}
public SendingUser(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
And eventually let's try to use it:
@RestController
public class SpringSerializationDemoController {
private final KafkaTemplate<Long, SendingUser> template;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public SpringSerializationDemoController(KafkaTemplate<Long, SendingUser> template) {
this.template = template;
}
@GetMapping("/start-demo")
public String startDemo() {
SendingUser user = new SendingUser("John", "Smith");
template.send("demo", user);
return "OK";
}
@KafkaListener(topics = "demo")
public void consumeCutSample(GettingUser user) {
logger.info("Got user: {}", user);
}
}
I can see exception which show my app can't cast SendingUser
to GettingUser
.
Exception:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.springkafkasimpledemo.controller.SpringSerializationDemoController.consumeCutSample(com.example.springkafkasimpledemo.domain.GettingUser)]
Bean [com.example.springkafkasimpledemo.controller.SpringSerializationDemoController@53601a9c]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2037) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2025) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
... 13 common frames omitted
But why does it do it? My object mapper doesn't use types. I can see it in my console:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic demo
{"firstName":"John","lastName":"Smith"}
Can't understand why spring-kafka tries to cast SendingUser to GettingUser.
Upvotes: 2
Views: 2498
Reputation: 174554
If you are using the Spring JSON (de)serializer (both sides), you need to configure the type mapping - see https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#json-serde
Map the source class to a token on the sending side and map the token to the required class on the receiving side.
Or you can disable the use of headers in the deserializer and configure a default type. See setUseTypeHeaders()
.
EDIT
For more sophisticated types, e.g. generics, you should configure the deserializer to call a method that returns a JavaType
.
e.g. for a List<Foo>
:
public static JavaType returnType(String topic, byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.Application.returnType
Use a TypeReference
or constructParametricType
for your Event<SourceFile>
. See Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double
Upvotes: 1