Reputation: 397
I have an integration flow bound to cloud stream input channel from Kafka.
Then messages go to publishSubscribe channel with singleThreadExecutor.
And from there they go to one subscriber which handles them. Handling can take some time.
As far as I understand handling is done on the singleThreadExecutor. And the cloud streaming's thread is freed to pickup another message from Kafka.
What will happen if new message arrives but the handling thread is still busy? Will the cloud streaming's thread wait or message will be discarded? If it would wait then for how long? Is there some default timeout value?
I have a guess that we are loosing some messages in such scenario as I can see messages in Kafka but there is no corresponding updates in the DB ...
But most of the messages are processed as expected.
public interface PointChannelProcessor {
@Input("point-channel")
MessageChannel pointChannel();
}
@Bean
public MessageChannel routeByKeyMessageChannel() {
return MessageChannels.publishSubscribe(Executors.newSingleThreadExecutor()).get();
}
@Bean
public IntegrationFlow retrievePointListFromKafka() {
return IntegrationFlows.from(pointChannelProcessor.pointChannel())
.transform(new JsonToObjectTransformer(PointKafkaImport.class))
.channel(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.get();
}
@Bean
public IntegrationFlow routePointsByKey(){
return IntegrationFlows.from(ROUTE_BY_KEY_MESSAGE_CHANNEL)
.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, failedPointsChannel(), true))
.route((GenericHandler<PointKafkaImport>) (payload, headers) -> {
//some routing logic
if (...) {
return ONE_CHANNEL;
} else if (...){
return NULL_CHANNEL;
} else {
return ANOTHER_CHANNEL;
}
})
.get();
}
//Other direct channels from routing which handles payload
@Bean
public IntegrationFlow savePoint(){
return IntegrationFlows.from(ONE_CHANNEL)
.handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
.get();
}
@Bean
public IntegrationFlow updatePoint(){
return IntegrationFlows.from(ANOTHER_CHANNEL)
.handle((GenericHandler<PointKafkaImport>) (payload, headers) -> ...)
.get();
}
Upvotes: 0
Views: 370
Reputation: 3270
You need be more clear about your problem, but as I understand, please make sure you find the difference between publish-subscribe
and Producer-Consumer
. In Kafka your code is responsible to pickup message from the queue and the message is not send to your code. So your code must NOT busy to pickup a message from the Queue.
Furthermore, there are many strategy to ensure that you read and process the data from queue and nothing lost. In consumer, you just read the message and increase offset, if you fail in processing you can read the message again. The messages will be removed by an strategy which you set when setup Kafka.
Upvotes: 1