Alex
Alex

Reputation: 17

How to configure outbound channel adapter with synchronization factory in spring boot

What would be the equivalent of the following outbound channel adapter configuration in spring boot? Assuming the messageChannel, taskExecutor, and synchronizationFactory are defined.

    <int:outbound-channel-adapter id="outboundChannelAdapter" channel="messageChannel" ref="handler" method="handle">
        <int:poller task-executor="taskExecutor" fixed-delay="500" receive-timeout="500" max-messages-per-poll="10">
            <int:transactional synchronization-factory="synchronizationFactory" isolation="READ_COMMITTED"/>
        </int:poller>
    </int:outbound-channel-adapter>

The @ServiceActivator with the @Poller annotation doesn't seem to have an option for transaction synchronization factory. The PollerMetadata has an option for it but I'm not sure how to connect that instance to the @ServiceActivator.

The synchronization factory is needed in this case because it's a DB-based channel with multiple threads reading from it.

Upvotes: 1

Views: 477

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

Something like this:

@ServiceActivator(inputChannel = "messageChannel", poller = @Poller("myPollerMetadata"))
public void handle(Message<?> message) { // Or what is your service method signature
    ...
}

@Bean
PollerMetadata myPollerMetadata(Executor taskExecutor, TransactionSynchronizationFactory synchronizationFactory) {

    PollerMetadata poller = new PollerMetadata();
    poller.setTransactionSynchronizationFactory(synchronizationFactory);
    poller.setMaxMessagesPerPoll(10);
    poller.setReceiveTimeout(500);
    poller.setTaskExecutor(taskExecutor);
    poller.setTrigger(new PeriodicTrigger(500));
    return poller;
}

You may also consider to start learning Spring Integration Java DSL: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl. The same config would look like this:

@Bean
IntegrationFlow myHandlerFlow(Executor taskExecutor, TransactionSynchronizationFactory synchronizationFactory) {
    return IntegrationFlows.from("messageChannel")
              .handle(handler, "handle", 
                            c -> c.poller(p -> p
                                     .fixedDelay(500)
                                     .transactionSynchronizationFactory(synchronizationFactory)
                                     .taskExecutor(taskExecutor)
                                     .receiveTimeout(500)
                                     .maxMessagesPerPoll(10)))
              .get();
}

Upvotes: 1

Related Questions