Reputation: 558
I have a producer with following configuration for content-type
spring:
cloud:
stream:
bindings:
eventOut:
destination: lab_csi
content-type: application/json
On consumer side I use spring integration (KinesisMessageDrivenChannelAdapter) to route event to different channels.When I receive Message on listener class like below:
@ServiceActivator(inputChannel = "channelA")
void handleMessage(Message<?> msg) {
objectMapper.readValue(msg.getPayload(), MyEvent.class);
}
the marshalling to MyEvent fails. In the stack error I can see that content-type is part of payload and payload is not still deserialized from json to POJO.
I am wondering how I can deserialize message before doing any other transformation. I don't find any method that I can set a MessageConverter to the adapter.
I appreciate your help.
Thank you
Upvotes: 1
Views: 431
Reputation: 121177
Sounds like your producer is Spring Cloud Stream, but consumer is just plain KinesisMessageDrivenChannelAdapter
. That is not clear why would one not use Spring Cloud Stream consumer as well, but anyway...
Your problem that SCSt producer serializes message headers together with the payload into the Kinesis record body. Just because AWS Kinesis doesn't support headers per se.
If you really are not interested in the headers on the consumer side, you can disable embedding headers on the producer side:
spring:
cloud:
stream:
bindings:
eventOut:
destination: lab_csi
producer:
headerMode: none
Otherwise you don't have choice on the plain KinesisMessageDrivenChannelAdapter
side unless you use EmbeddedHeaderUtils
manually.
Upvotes: 2