Reputation: 4083
I am using spring batch remote partitioning. Below is my configuration
<task:executor id="taskExecutor" pool-size="50" />
<rabbit:template id="computeAmqpTemplate"
connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
reply-timeout="${compute.partition.timeout}">
</rabbit:template>
<int:channel id="computeOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="computeInboundStagingChannel" />
<amqp:outbound-gateway request-channel="computeOutboundChannel"
reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="computeMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="computeOutboundChannel"
p:receiveTimeout="${compute.partition.timeout}" />
<beans:bean id="computePartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="computeStep" p:gridSize="${compute.grid.size}"
p:messagingOperations-ref="computeMessagingTemplate" />
<int:aggregator ref="computePartitionHandler"
send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
input-channel="computeInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
request-channel="computeInboundChannel"
reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<int:channel id="computeInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />
<int:channel id="computeOutboundStagingChannel" />
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
<beans:bean id="computeFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader"
p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
scope="step" />
<beans:bean id="computeItemWriter"
class="com.st.batch.foundation.writers.ComputeItemWriter"
p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
p:batchId="#{jobParameters[batch_id]}" scope="step" />
<step id="computeStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="computeFileItemReader" writer="computeItemWriter"
commit-interval="${compute.commit.interval}" />
</tasklet>
</step>
<flow id="computeFlow">
<step id="computeStep.master">
<partition partitioner="computePartitioner"
handler="computePartitionHandler" />
</step>
</flow>
<job id="computeJob" restartable="true">
<flow id="computeJob.computeFlow" parent="computeFlow" />
</job>
The problem is, in multiresouce partitioner I am passign the pattern to look for files and create partitions equal to number of files. But The directory is created at run time and conditionally.
I want this step to make successfully if the directory does not exist (input files not available) and proceed to next step.
Right now, the jobs just hangs, doest do anything. Neither it considers the step successfull not it throws exception so doesnt even fail. It just becomes idle at this step.
<beans:bean id="computePartitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
scope="step" />
Is there any way to handle this ? I just want to consider this step as successful and proced to next step.
Upate:
Just tested it with creating partitioned step which runs on local server instead of remote, if files does not exist, it by default marks step as completed. So the problem is not with MultiResourcePartitioner, the problem occurs when we use partitioned step to run on remote server with above configuration.
I guess, its aggregation logic which keeps waiting for response even though there are no step execution messages sent ? Is it due to default SequenceSizeReleaseStrategy which relies on IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute and as there is no messages at all aggregator can't access to SEQUENCE_SIZE ?
@MessageEndpoint
public class MessageChannelPartitionHandler implements PartitionHandler {
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
StepExecution masterStepExecution) throws Exception {
Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);
int count = 0;
if (replyChannel == null) {
replyChannel = new QueueChannel();
}//end if
for (StepExecution stepExecution : split) {
Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}
Message<Collection<StepExecution>> message = messagingGateway.receive(replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Received replies: " + message);
}
Collection<StepExecution> result = message.getPayload();
return result;
}
private Message<StepExecutionRequest> createMessage(int sequenceNumber, int sequenceSize,
StepExecutionRequest stepExecutionRequest, PollableChannel replyChannel) {
return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(sequenceNumber)
.setSequenceSize(sequenceSize)
.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
.setReplyChannel(replyChannel)
.build();
}
}
If there are no step execution request i.e. split count is 0, then it won't go inside forloop so wont send any messages but still after for loop it will wait to receive for response. What could be the solution ?
Upvotes: 2
Views: 2047
Reputation: 4083
I had raised issue on spring batch bug tracker Jira, https://jira.spring.io/browse/BATCH-2283.
Its issue with MessageChannelPartitionHandler which doesn't handle this scenario as of now. The temporary fix is to override MessageChannelPartitionHandler.handle, check the size of message set and return NULL as proposed in above raised ticket.
This is how the method should be, it worked for me.
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
StepExecution masterStepExecution) throws Exception {
Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);
if(split.size() == 0) {
return null;
}
int count = 0;
if (replyChannel == null) {
replyChannel = new QueueChannel();
}//end if
for (StepExecution stepExecution : split) {
Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}
if (logger.isDebugEnabled()) {
logger.debug("No message sent but waiting for reply: ");
}
Message<Collection<StepExecution>> message = messagingGateway.receive(replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Received replies: " + message);
}
Collection<StepExecution> result = message.getPayload();
return result;
}
Upvotes: 0
Reputation: 18403
Use a decider to check for directory and return CONTINUE or SKIP (or any other meaningful value you want)
Upvotes: 1