Reputation: 782
I'm trying to create a sample app that sends a person to process and then collect all errors and I'm using Jackson to serialize.
But my Sink fails whet it tries to convert messages received from Processor. Looks like it is receiving messages with a different contentType instead of application/json. What should I do to get this working?
I'm using Spring Cloud Dalston.SR3
Main:
@SpringBootApplication
public class PersonStreamApplication {
@Bean
public MessageConverter customMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
return converter;
}
public static void main(String[] args) {
SpringApplication.run(PersonStreamApplication.class, args);
}
}
Source:
@EnableBinding(PersonSource.Source.class)
public class PersonSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<Person> getDate() {
Map<String, Object> headers = new HashMap<>();
headers.put("directory", "/dir");
return () -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers));
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}
Processor:
@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {
private final PersonValidator validator;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Either<String, Person> process(Person person) {
return this.validator.validate(person).toEither();
}
}
Sink:
@EnableBinding(PersonSink.Sink.class)
public class PersonSink {
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}
application.yml
spring:
cloud:
stream:
bindings:
sample-source:
destination: testing.stream.input
input:
destination: testing.stream.input
output:
content-type: application/json
destination: testing.stream.output
sample-sink:
destination: testing.stream.output
Upvotes: 1
Views: 478
Reputation: 121482
Your problem is with:
@Bean
public IntegrationFlow log() {
return IntegrationFlows.from(Sink.SAMPLE).log().get();
}
You have to use @StreamListener
to get a gain a Content-Type
conversion: https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling
The distinction between
@StreamListener
and a Spring Integration@ServiceActivator
is seen when considering an inbound Message that has a String payload and acontentType
header ofapplication/json
. In the case of@StreamListener
, theMessageConverter
mechanism will use thecontentType
header to parse theString
payload into aVote
object.
Upvotes: 1