Reputation: 105
I am creating a Spring Integration application where I need to dynamically instantiate IntegrationFlow
s. My flow looks like the following:
kafkaListener -> intermediateChannel -> httpOutboundGateway
, where httpOutboundGateway
is advised by ExpressionEvaluatingRequestHandlerAdvice
routing errors to persistent errorChannel
. That errorChannel
has a poller and an IntegrationFlow
which returns the errors back in intermediateChannel
.
The scheme works like a charm when all the flows and errorChannel
bean are created by Spring and automatically autowired. However, when I am trying to instantiate all of them programmatically, only happy path works (kafka -> intermediateChannel -> httpOutboundGateway
), but errorChannel
doesn't receive data - the log string in doSend
message is not printed, and the flow does not have errors retried. No errors or warnings occur in log on startup, it just looks like the advice is not working.
I've used some advises from here, and they helped to register the flows and to get happy path working. However, it looks like I am missing something when it comes up to registering not flows themselves, but supporting beans.
I also tried checking in debug whether a corresponding errorChannel
bean is being wired into applicationContext
, and it is indeed. It can be lookuped by errorChannelName
and it is available on IntegrationFlow
registration moment.
Here's my code.
I create a error channel manually in @PostConstruct
, as well as registering it and the flows:
@PostConstruct
fun topology() {
rules.forEach { rule ->
val mainChannelName = rule.topicFrom + "-channel"
val errorChannelName = rule.topicFrom + "-error-channel"
val errorChannel = channelProducer.createPollableDatabaseChannel(errorChannelName)
val topicToChannelFlow = integrationFlowProducer.fromTopicToChannel(rule.topicFrom, mainChannelName)
val channelToEndpointFlow = integrationFlowProducer.
fromChannelToEndpoint(mainChannelName, rule.endpointDetails, errorChannelName)
val errorChannelToMainFlow = integrationFlowProducer.fromErrorChannelToMain(errorChannelName, mainChannelName)
integrationFlowContext.registration(topicToChannelFlow).register()
integrationFlowContext.registration(channelToEndpointFlow).addBean(errorChannel).register()
integrationFlowContext.registration(errorChannelToMainFlow).register()
}
}
Here's the channelProducer.createPollableDatabaseChannel(errorChannelName)
method implementation:
fun createPollableDatabaseChannel(channelName: String): PollableChannel {
val queueChannel = object: QueueChannel(MessageGroupQueue(jdbcStore, channelName)) {
override fun doSend(message: Message<*>, timeout: Long): Boolean {
logger.info("sending message to error channel: $message")
return super.doSend(message, timeout)
}
override fun doReceive(timeout: Long): Message<*> {
val received = super.doReceive(timeout)
logger.info("received message: $received")
return received
}
}
return queueChannel
}
And here are the methods which were called above in integrationFlowProducer
:
fun fromTopicToChannel(topicFrom: String, receiverChannel: String): IntegrationFlow {
return IntegrationFlows
.from(
Kafka.messageDrivenChannelAdapter(
consumerFactory,
KafkaMessageDrivenChannelAdapter.ListenerMode.record, topicFrom
)
.configureListenerContainer { c ->
c.ackMode(ContainerProperties.AckMode.RECORD)
}
)
.log()
.channel(receiverChannel)
.get()
}
fun fromChannelToEndpoint(
channelFrom: String,
endpointDetails: EndpointDetails,
errorChannelName: String
): IntegrationFlow {
if (endpointDetails is RestEndpointDetails) {
return createRestIntegrationFlow(channelFrom, endpointDetails, errorChannelName)
}
throw UnsupportedOperationException("only rest endpoint supported")
}
fun fromErrorChannelToMain(errorChannelName: String, mainChannelName: String): IntegrationFlow {
return IntegrationFlows.from(errorChannelName)
.wireTap {f -> f.handle {t -> logger.info("Message read from error channel: " + t.payload.toString())}}
.transform<ErrorMessage, String> { extractPayloadFromErrorMessage(it) }
.channel(mainChannelName)
.get()
}
private fun createRestIntegrationFlow(
channelFrom: String,
endpointDetails: RestEndpointDetails,
errorChannelName: String
): IntegrationFlow {
return IntegrationFlows.from(channelFrom)
.wireTap { f -> f.handle { t -> logger.info("Message read from main channel: " + t.payload.toString()) } }
.handle<HttpRequestExecutingMessageHandler>(
Http.outboundGateway(
endpointDetails.url,
sslRestTemplate
)
.httpMethod(HttpMethod.POST)
.headerMapper(kafkaToHttpHeaderMapper)
.expectedResponseType(String::class.java)
) { c -> c.advice(failureAdvice(errorChannelName)) }
.nullChannel()
}
private fun extractPayloadFromErrorMessage(errorMessage: ErrorMessage) =
(errorMessage.payload as EvaluatingException).failedMessage!!.payload as String
private fun failureAdvice(errorChannelName: String): Advice {
val advice = ExpressionEvaluatingRequestHandlerAdvice()
advice.setFailureChannelName(errorChannelName)
return advice
}
The poller is created as follows, and that's an auto-initialized bean:
@Bean(name = [PollerMetadata.DEFAULT_POLLER])
fun poller(deliveryTransactionInterceptor: TransactionInterceptor,
deliveryThreadPoolTaskExecutor: TaskExecutor): PollerMetadata {
return Pollers.fixedDelay(30, TimeUnit.SECONDS)
.advice(deliveryTransactionInterceptor)
.taskExecutor(deliveryThreadPoolTaskExecutor)
.get()
}
There are also some other persistence settings, but they remained the same for working (auto-initialized) code and for current one. If they are needed, I can also paste them here.
The version of Spring Integration is 5.1.4.RELEASE.
I would very much appreciate any help on the subject.
Upvotes: 1
Views: 388
Reputation: 121542
Since you do everything on the fly, it looks like you have an ExpressionEvaluatingRequestHandlerAdvice
instances for every single dynamic flow. I believe that one has to be registered as bean as well. That addBean()
on the flow registration is the way to go. I understand that you would need to rework some of your code, but it indeed must be as a bean as well.
Upvotes: 1