Marcus Henrique
Marcus Henrique

Reputation: 782

Converting messages between Processor and Sink

I'm trying to create a sample app that sends a person to process and then collect all errors and I'm using Jackson to serialize.

But my Sink fails whet it tries to convert messages received from Processor. Looks like it is receiving messages with a different contentType instead of application/json. What should I do to get this working?

I'm using Spring Cloud Dalston.SR3

Main:

@SpringBootApplication
public class PersonStreamApplication {

    @Bean
    public MessageConverter customMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(new ObjectMapper().registerModule(new VavrModule()));
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(PersonStreamApplication.class, args);
    }
}

Source:

@EnableBinding(PersonSource.Source.class)
public class PersonSource {

    @Bean
    @InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    public MessageSource<Person> getDate() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("directory", "/dir");
        return () -> createMessage(new Person("[email protected]", 28), new MessageHeaders(headers));
    }

    public interface Source {

        String SAMPLE = "sample-source";

        @Output(SAMPLE)
        MessageChannel sampleSource();
    }
}

Processor:

@RequiredArgsConstructor
@EnableBinding(Processor.class)
public class PersonProcessor {

    private final PersonValidator validator;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Either<String, Person> process(Person person) {
        return this.validator.validate(person).toEither();
    }
}

Sink:

@EnableBinding(PersonSink.Sink.class)
public class PersonSink {

    @Bean
    public IntegrationFlow log() {
        return IntegrationFlows.from(Sink.SAMPLE).log().get();
    }

    public interface Sink {

        String SAMPLE = "sample-sink";

        @Input(SAMPLE)
        SubscribableChannel sampleSink();
    }
}

application.yml

spring:
  cloud:
    stream:
      bindings:
        sample-source:
          destination: testing.stream.input
        input:
          destination: testing.stream.input
        output:
          content-type: application/json
          destination: testing.stream.output
        sample-sink:
          destination: testing.stream.output

Upvotes: 1

Views: 478

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121482

Your problem is with:

@Bean
public IntegrationFlow log() {
    return IntegrationFlows.from(Sink.SAMPLE).log().get();
}

You have to use @StreamListener to get a gain a Content-Type conversion: https://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_using_streamlistener_for_automatic_content_type_handling

The distinction between @StreamListener and a Spring Integration @ServiceActivator is seen when considering an inbound Message that has a String payload and a contentType header of application/json. In the case of @StreamListener, the MessageConverter mechanism will use the contentType header to parse the String payload into a Vote object.

Upvotes: 1

Related Questions