Reputation: 558
I am building a Spring-based library which should consume and deliver messages to configured channels after transforming them to correct type. My library is configurable by list of pairs "streamToConsume: FinalChannelDestination"
streams:
source: destinationChannel
I would like to have an IntegrationFlow
like below:
IntegrationFlows
.from(kinesisInboundChannelAdapter(amazonKinesis(), streamNames))
.transform(new IssuanceTransformer())
.route(router())
.get();
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AwsHeaders.STREAM);
consumerClientProperties.getKinesis().getStreams().forEach((k, v) ->
router.setChannelMapping(k, v)
);
return router;
}
Transforming events and afterwards deliver them to the channel mapped to stream in the configuration. How can I keep event header after transformation to be able send it to correct channel?
Thank you
Upvotes: 2
Views: 650
Reputation: 121282
I believe your concern that after an IssuanceTransformer
there is no a desired AwsHeaders.STREAM
header anymore. When you develop a custom transformer you need to ensure that you transfer all the headers from the request message to the reply message: unlike many other components transformer doesn't modify a reply message from the POJO.
For this purpose you can use something like:
MessageBuilder.withPayload(myPayload).copyHeadersIfAbsent(requestMessage.getHeaders()).build();
Note: you can use a AwsHeaders.RECEIVED_STREAM
because exactly this one is populated from the KinesisMessageDrivenChannelAdapter
:
private void performSend(AbstractIntegrationMessageBuilder<?> messageBuilder, Object rawRecord) {
messageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, this.shardOffset.getStream())
.setHeader(AwsHeaders.SHARD, this.shardOffset.getShard());
if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
messageBuilder.setHeader(AwsHeaders.CHECKPOINTER, this.checkpointer);
}
Upvotes: 1