Zach Z
Zach Z

Reputation: 25

Add Batch capabilities to Integration flow

I currently have a Spring Integration flow that works fine (see link for diagram). I would like to add Batch on top of my current configuration to allow for retry with exponential back-off, circuit breaker pattern, and persisting jobs to the database for restart.

The Integration flow consists of a Gateway that takes a Message<MyObj>, which is eventually routed to a Transformer that converts Message<MyObj> to a Message<String>. The Aggregator then takes Message<String> and eventually releases a concatenated Message<String> (using both a size release-strategy and a MessageGroupStoreReaper with a timeout). The concatenated String is then the payload of the File uploaded using SFTP outbound-channel-adapter.

I have searched, read through docs, looked at tons of examples, and I can't figure out how to encapsulate the last step of the process into a Batch Job. I need the ability to retry uploading the String (as payload of File) if there is an SFTP connection issue or other Exception thrown during the upload. I also want to be able to restart (using database backed JobRepository) in case of some failure, so I don't think using Retry Advice is sufficient.

Please explain and help me understand how to wire together the pieces and which to use (job-launching-gateway, MessageToJobRequest Transformer, ItemReader, ItemWriter??). I'm also unsure how to access each Message<String> and send to the SFTP channel-adaptor inside of a Job, Step, or Tasklet.

Current flow: https://i.sstatic.net/GnurV.png

Upvotes: 1

Views: 766

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121237

First of all let's take a look how we can overcome your requirements without Batch.

  1. <int-sftp:outbound-channel-adapter> has <request-handler-advice-chain>, where you can configure RequestHandlerRetryAdvice and RequestHandlerCircuitBreakerAdvice.

  2. To achieve the restartable option you can make the input channel of that adapter as a persistent queue with message-store.

Now about Batch.

To start a Job from the Integration flow you should write some MessageToJobRequest and use <batch-int:job-launching-gateway> after that. Here, of course, you can place your payload to the jobParameters.

To send a message from Job to some channel (e.g. to sftp adapter) you can use org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter.

Read more here: http://docs.spring.io/spring-batch/reference/html/springBatchIntegration.html

Upvotes: 1

Related Questions