G. Urikh
G. Urikh

Reputation: 105

Why is my dynamic IntegrationFlow not routing to error channel?

I am creating a Spring Integration application where I need to dynamically instantiate IntegrationFlows. My flow looks like the following:

kafkaListener -> intermediateChannel -> httpOutboundGateway, where httpOutboundGateway is advised by ExpressionEvaluatingRequestHandlerAdvice routing errors to persistent errorChannel. That errorChannel has a poller and an IntegrationFlow which returns the errors back in intermediateChannel.

The scheme works like a charm when all the flows and errorChannel bean are created by Spring and automatically autowired. However, when I am trying to instantiate all of them programmatically, only happy path works (kafka -> intermediateChannel -> httpOutboundGateway), but errorChannel doesn't receive data - the log string in doSend message is not printed, and the flow does not have errors retried. No errors or warnings occur in log on startup, it just looks like the advice is not working.

I've used some advises from here, and they helped to register the flows and to get happy path working. However, it looks like I am missing something when it comes up to registering not flows themselves, but supporting beans.

I also tried checking in debug whether a corresponding errorChannel bean is being wired into applicationContext, and it is indeed. It can be lookuped by errorChannelName and it is available on IntegrationFlow registration moment.

Here's my code.

I create a error channel manually in @PostConstruct, as well as registering it and the flows:

   @PostConstruct
   fun topology() {
       rules.forEach { rule ->
           val mainChannelName = rule.topicFrom + "-channel"
           val errorChannelName = rule.topicFrom + "-error-channel"
           val errorChannel = channelProducer.createPollableDatabaseChannel(errorChannelName)

           val topicToChannelFlow = integrationFlowProducer.fromTopicToChannel(rule.topicFrom, mainChannelName)
           val channelToEndpointFlow = integrationFlowProducer.
               fromChannelToEndpoint(mainChannelName, rule.endpointDetails, errorChannelName)
           val errorChannelToMainFlow = integrationFlowProducer.fromErrorChannelToMain(errorChannelName, mainChannelName)


           integrationFlowContext.registration(topicToChannelFlow).register()
           integrationFlowContext.registration(channelToEndpointFlow).addBean(errorChannel).register()
           integrationFlowContext.registration(errorChannelToMainFlow).register()
       }
   }

Here's the channelProducer.createPollableDatabaseChannel(errorChannelName) method implementation:

    fun createPollableDatabaseChannel(channelName: String): PollableChannel {
        val queueChannel = object: QueueChannel(MessageGroupQueue(jdbcStore, channelName)) {
            override fun doSend(message: Message<*>, timeout: Long): Boolean {
                logger.info("sending message to error channel: $message")
                return super.doSend(message, timeout)
            }

            override fun doReceive(timeout: Long): Message<*> {
                val received = super.doReceive(timeout)
                logger.info("received message: $received")
                return received
            }
        }
        return queueChannel
    }

And here are the methods which were called above in integrationFlowProducer:

    fun fromTopicToChannel(topicFrom: String, receiverChannel: String): IntegrationFlow {
        return IntegrationFlows
            .from(
                Kafka.messageDrivenChannelAdapter(
                    consumerFactory,
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, topicFrom
                )
                    .configureListenerContainer { c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                    }
            )
            .log()
            .channel(receiverChannel)
            .get()
    }

    fun fromChannelToEndpoint(
        channelFrom: String,
        endpointDetails: EndpointDetails,
        errorChannelName: String
    ): IntegrationFlow {

        if (endpointDetails is RestEndpointDetails) {
            return createRestIntegrationFlow(channelFrom, endpointDetails, errorChannelName)
        }

        throw UnsupportedOperationException("only rest endpoint supported")
    }

    fun fromErrorChannelToMain(errorChannelName: String, mainChannelName: String): IntegrationFlow {

        return IntegrationFlows.from(errorChannelName)
            .wireTap {f -> f.handle {t -> logger.info("Message read from error channel: " + t.payload.toString())}}
            .transform<ErrorMessage, String> { extractPayloadFromErrorMessage(it) }
            .channel(mainChannelName)
            .get()
    }

    private fun createRestIntegrationFlow(
        channelFrom: String,
        endpointDetails: RestEndpointDetails,
        errorChannelName: String
    ): IntegrationFlow {
        return IntegrationFlows.from(channelFrom)
            .wireTap { f -> f.handle { t -> logger.info("Message read from main channel: " + t.payload.toString()) } }
            .handle<HttpRequestExecutingMessageHandler>(
                Http.outboundGateway(
                    endpointDetails.url,
                    sslRestTemplate
                )
                    .httpMethod(HttpMethod.POST)
                    .headerMapper(kafkaToHttpHeaderMapper)
                    .expectedResponseType(String::class.java)


            ) { c -> c.advice(failureAdvice(errorChannelName)) }
            .nullChannel()
    }

    private fun extractPayloadFromErrorMessage(errorMessage: ErrorMessage) =
        (errorMessage.payload as EvaluatingException).failedMessage!!.payload as String

    private fun failureAdvice(errorChannelName: String): Advice {
        val advice = ExpressionEvaluatingRequestHandlerAdvice()
        advice.setFailureChannelName(errorChannelName)
        return advice
    }

The poller is created as follows, and that's an auto-initialized bean:

    @Bean(name = [PollerMetadata.DEFAULT_POLLER])
    fun poller(deliveryTransactionInterceptor: TransactionInterceptor,
               deliveryThreadPoolTaskExecutor: TaskExecutor): PollerMetadata {
        return Pollers.fixedDelay(30, TimeUnit.SECONDS)
            .advice(deliveryTransactionInterceptor)
            .taskExecutor(deliveryThreadPoolTaskExecutor)
            .get()
    }

There are also some other persistence settings, but they remained the same for working (auto-initialized) code and for current one. If they are needed, I can also paste them here.

The version of Spring Integration is 5.1.4.RELEASE.

I would very much appreciate any help on the subject.

Upvotes: 1

Views: 388

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

Since you do everything on the fly, it looks like you have an ExpressionEvaluatingRequestHandlerAdvice instances for every single dynamic flow. I believe that one has to be registered as bean as well. That addBean() on the flow registration is the way to go. I understand that you would need to rework some of your code, but it indeed must be as a bean as well.

Upvotes: 1

Related Questions