sansari
sansari

Reputation: 558

Spring Integration: Transform and route with header

I am building a Spring-based library which should consume and deliver messages to configured channels after transforming them to correct type. My library is configurable by list of pairs "streamToConsume: FinalChannelDestination"

streams:
    source: destinationChannel

I would like to have an IntegrationFlow like below:

IntegrationFlows
        .from(kinesisInboundChannelAdapter(amazonKinesis(), streamNames))
        .transform(new IssuanceTransformer())
        .route(router())
        .get();

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter(AwsHeaders.STREAM);
    consumerClientProperties.getKinesis().getStreams().forEach((k, v) ->
        router.setChannelMapping(k, v)
    );
    return router;
  }

Transforming events and afterwards deliver them to the channel mapped to stream in the configuration. How can I keep event header after transformation to be able send it to correct channel?

Thank you

Upvotes: 2

Views: 650

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121282

I believe your concern that after an IssuanceTransformer there is no a desired AwsHeaders.STREAM header anymore. When you develop a custom transformer you need to ensure that you transfer all the headers from the request message to the reply message: unlike many other components transformer doesn't modify a reply message from the POJO.

For this purpose you can use something like:

MessageBuilder.withPayload(myPayload).copyHeadersIfAbsent(requestMessage.getHeaders()).build();

Note: you can use a AwsHeaders.RECEIVED_STREAM because exactly this one is populated from the KinesisMessageDrivenChannelAdapter:

private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
        messageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream())
                .setHeader(AwsHeaders.SHARD, this.shardOffset.getShard());

        if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
            messageBuilder.setHeader(AwsHeaders.CHECKPOINTER, this.checkpointer);
        }

Upvotes: 1

Related Questions