Reputation: 926
We have an application the consumes messages from Kafka and process it. We are using Spring boot 2.2.6.RELEASE, and Spring cloud Hoxton.SR4.
I am trying to receive a simple message:
{
"payload": {
"config": {
"credentials": {}
},
"id": "est-00001",
"merchantKey": "test-00001",
"name": "Test",
"version": 7,
"type": "PARTNER",
"vendorNumber": "14"
},
"metadata": {
"timestamp": -1,
"partition": 8,
"key": "test-00001",
"offset": 105,
"topic": "configure",
"headers": []
},
"key": "00001",
"messageType": "dent.set",
"id": "abf75248-6fb0-4b57-a92c-74d4d3143cc0",
"time": "2018-03-16T15:56Z"
}
And this is the model I am using to deserialise the message
package com.commercetools.tuev.marketplace.merchant.model;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
@AllArgsConstructor
public class MerchantMessage {
public MerchantMessage(){}
@JsonProperty("id")
@JsonPropertyDescription("The message id, usually a random UUID")
private String id;
@JsonProperty("time")
@JsonPropertyDescription("ISO-8501 timestamp of the event")
private String time;
@JsonProperty("key")
@JsonPropertyDescription("The key has to always match the entity's id (product id)")
private String key;
@JsonProperty("messageType")
@JsonPropertyDescription("The Message Type")
private String messageType;
@JsonProperty("payload")
private Map<String, Object> payload;
@JsonProperty("metadata")
private Map<String, Object> metadata;
}
and all I get is the following exception:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
application.properties
spring.cloud.stream.bindings.merchantInput.destination=merchantTopic
spring.cloud.stream.bindings.merchantInput.group=consumerGroup
spring.cloud.stream.bindings.merchantInput.consumer.header-mode=none
spring.cloud.stream.bindings.merchantInput.content-type=application/json
spring.cloud.stream.kafka.bindings.merchantInput.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.merchantInput.consumer.partitioned=true
spring.cloud.stream.bindings.merchantInput.consumer.max-attempts=1
Function
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Flux<Message<MerchantMessage>> message) {
message
.map(Message::getPayload)
.doOnNext(System.out::println)
.subscribe(payload-> System.out.println("Consumed: "+payload));
}
If I remove the flux, everything just works fine:
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Message<MerchantMessage> message) {
Mono.just(message)
.map(Message::getPayload)
.doOnNext(System.out::println)
.subscribe(payload -> System.out.println("Consumed: " + payload));
}
Upvotes: 0
Views: 1042
Reputation: 6126
The @StreamListener
and annotation-based programming model has been deprecated for a while now and for the past several years we have fully migrated to a functional programming model which requires no annotations.
You can simply change your code to be
@Bean
Consumer<Flux<Message<MerchantMessage>> message> {
return flux -> flux
.map(Message::getPayload)
.doOnNext(System.out::println)
.subscribe(payload-> System.out.println("Consumed: "+payload));
}
Your input binding by convention is named message-in-0
. You can get more information here as well.
Upvotes: 2