Reputation: 93
My Message handler for publishing messages to the kinesis stream is as follows
public MessageHandler kinesisMessageHandler(final AmazonKinesisAsync amazonKinesis,
@Qualifier("successChannel") MessageChannel successChannel,
@Qualifier("errorChannel") MessageChannel errorChannel) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setSync(false);
kinesisMessageHandler.setOutputChannel(successChannel);
kinesisMessageHandler.setFailureChannel(errorChannel);
return kinesisMessageHandler;
}
@Bean(name = "errorChannel")
public MessageChannel errorChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "successChannel")
public MessageChannel successChannel() {
return MessageChannels.direct().get();
}
The setSync flag is set as false so that the messages are getting processed asynchronously.Also, I have created separate IntegrationFlow to receive and process Kinesis response from the success & error channel.
public IntegrationFlow successMessageIntegrationFlow(MessageChannel successChannel,
MessageChannel inboundKinesisMessageChannel,
MessageReceiverServiceActivator kinesisMessageReceiverServiceActivator) {
return IntegrationFlows.from(successChannel).channel(inboundKinesisMessageChannel)
.handle(kinesisMessageReceiverServiceActivator, "receiveMessage").get();
}
@Bean
public IntegrationFlow errorMessageIntegrationFlow(MessageChannel errorChannel,
MessageChannel inboundKinesisErrorChannel,
MessageReceiverServiceActivator kinesisErrorReceiverServiceActivator
) {
return IntegrationFlows.from(errorChannel).channel(inboundKinesisErrorChannel)
.handle(kinesisErrorReceiverServiceActivator, "receiveMessage").get();
}
I wanted to know if you see any issues in using Direct Channel to receive success & error responses from Kinesis and processing it using an IntegrationFlow. As far as I know, with Direct Channel a producer is a blocker during send until the consumer finishes its work and returns management to the producer caller back. Is it a correct assumption that here the producer is executed in a different set of thread pools by the AmazonKinesisAsyncClient and the producer will not wait for the IntegrationFlow to process the messages? Let me know If I need to implement it differently
Upvotes: 1
Views: 109
Reputation: 121552
Your assumption about blocking is correct: the control does not come back to the producing thread. So, if have a limited number of threads in that Kinesis client, you need to be sure that you free them as soon as possible. You might consider to have those callbacks in the queue channel instead. They are asynchronous anyway, but won’t hold Kinesis client if that.
You still have a flaw in your flows: .channel(inboundKinesisMessageChannel)
. That means the same channel in the middle if two different flows . And if it is a direct one , then you end up with round robin distribution. I would just remove it altogether .
Upvotes: 1