Mike Snare
Mike Snare

Reputation: 423

Spring Batch Integration Remote Chunking error - Message contained wrong job instance id [25] should have been [24]

I'm running into this bug (more info here) which appears to mean that for multi-threaded batches using remote chunking you can't use a common response channel. I'm not exactly sure how to proceed to get this working. Surely there's a way to get this working, because without it I can't see much benefit to remote chunking.

Here's my DSL config that creates a JobRequest:

@Bean
IntegrationFlow newPollingJobsAdapter(JobLaunchingGateway jobLaunchingGateway) {
    // Start by polling the DB for new PollingJobs according to the polling rate
    return IntegrationFlows.from(jdbcPollingChannelAdapter(),
            c -> c.poller(Pollers.fixedRate(10000)
                    // Do the polling on one of 10 threads.
                    .taskExecutor(Executors.newFixedThreadPool(10))
                    // pull out up to 100 new ids for each poll.
                    .maxMessagesPerPoll(100)))
            .log(LoggingHandler.Level.WARN)
            // The polling adapter above returns a list of ids.  Split them out into
            // individual ids
            .split()
            // Now push each one onto a separate thread for batch processing.
            .channel(MessageChannels.executor(Executors.newFixedThreadPool(10)))
            .log(LoggingHandler.Level.WARN)
            // Transform each one into a JobLaunchRequest
            .<Long, JobLaunchRequest>transform(id -> {
                logger.warn("Creating job for ID {}", id);
                JobParametersBuilder builder = new JobParametersBuilder()
                        .addLong("polling-job-id", id, true);
                return new JobLaunchRequest(job, builder.toJobParameters());
            })
            .handle(jobLaunchingGateway)
            // TODO: Notify somebody?  No idea yet
            .<JobExecution>handle(exec -> System.out.println("GOT EXECUTION: " + exec))
            .get();
}

Nothing in here is particularly special, no odd configs that I'm aware of.

The job itself is pretty straight-forward, too:

/**
 * This is the definition of the entire batch process that runs polling.
 * @return
 */
@Bean
Job pollingJobJob() {
    return jobBuilderFactory.get("pollingJobJob")
            .incrementer(new RunIdIncrementer())
            // Ship it down to the slaves for actual processing
            .start(remoteChunkingStep())
            // Now mark it as complete
            .next(markCompleteStep())
            .build();
}

/**
 * Sends the job to a remote slave via an ActiveMQ-backed JMS queue.
 */
@Bean
TaskletStep remoteChunkingStep() {
    return stepBuilderFactory.get("polling-job-step-remote-chunking")
            .<Long, String>chunk(20)
            .reader(runningPollingJobItemReader)
            .processor(toJsonProcessor())
            .writer(chunkWriter)
            .build();
}

/**
 * This step just marks the PollerJob as Complete.
 */
@Bean
Step markCompleteStep() {
    return stepBuilderFactory.get("polling-job-step-mark-complete")
            // We want each PollerJob instance to be a separate job in batch, and the
            // reader is using the id passed in via job params to grab the one we want,
            // so we don't need a large chunk size.  One at a time is fine.
            .<Long, Long>chunk(1)
            .reader(runningPollingJobItemReader)
            .processor(new PassThroughItemProcessor<Long>())
            .writer(this.completeStatusWriter)
            .build();
}

Here's the chunk writer config:

/**
 * This is part of the bridge between the spring-batch and spring-integration.  Nothing special or weird is going
 * on, so see the RemoteChunkHandlerFactoryBean for a description.
 */
@Bean
RemoteChunkHandlerFactoryBean<PollerJob> remoteChunkHandlerFactoryBean() {
    RemoteChunkHandlerFactoryBean<PollerJob> factory = new RemoteChunkHandlerFactoryBean<>();
    factory.setChunkWriter(chunkWriter);
    factory.setStep(remoteChunkingStep());
    return factory;
}

/**
 * This is the writer that will actually send the chunk to the slaves.  Note that it also configures the
 * internal channel on which replies are expected.
 */
@Bean
@StepScope
ChunkMessageChannelItemWriter<String> chunkWriter() {
    ChunkMessageChannelItemWriter<String> writer = new ChunkMessageChannelItemWriter<>();
    writer.setMessagingOperations(batchMessagingTemplate());
    writer.setReplyChannel(batchResponseChannel());
    writer.setThrottleLimit(1000);
    return writer;
}

The problem seems to be that last section sets up the ChunkMessageChannelItemWriter such that the replyChannel is the same one used by all of the writers, despite each writer being step-scoped. It would seem that I need to add a replyChannel header to one of the messages, but I'm not sure where in the chain to do that or how to process that (if I need to at all?).

Also, this is being sent to the slaves via JMS/ActiveMQ and I'd like to avoid having just a stupid number of nearly-identical queues on ActiveMQ just to support this.

What are my options?

Upvotes: 2

Views: 1396

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

Given that you are using a shared JMS infrastructure, you will need a router to get the responses back to the correct chunk writer.

If you use prototype scope on the batchResponseChannel() @Bean; you'll get a unique channel for each writer.

I don't have time to figure out how to set up a chunked batch job so the following simulates your environment (non-singleton bean that needs a unique reply channel for each instance). Hopefully it's self-explanatory...

@SpringBootApplication
public class So44806067Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So44806067Application.class, args);
        SomeNonSingletonNeedingDistinctRequestAndReplyChannels chunker1 = context
                .getBean(SomeNonSingletonNeedingDistinctRequestAndReplyChannels.class);
        SomeNonSingletonNeedingDistinctRequestAndReplyChannels chunker2 = context
                .getBean(SomeNonSingletonNeedingDistinctRequestAndReplyChannels.class);
        if (chunker1.equals(chunker2)) {
            throw new IllegalStateException("Expected different instances");
        }
        chunker1.sendSome();
        chunker2.sendSome();
        ChunkResponse results = chunker1.getResults();
        if (results == null) {
            throw new IllegalStateException("No results1");
        }
        if (results.getJobId() != 1L) {
            throw new IllegalStateException("Incorrect routing1");
        }
        results = chunker2.getResults();
        if (results == null) {
            throw new IllegalStateException("No results2");
        }
        if (results.getJobId() != 2L) {
            throw new IllegalStateException("Incorrect routing2");
        }
        context.close();
    }

    @Bean
    public Map<Long, PollableChannel> registry() {
        // TODO: should clean up entry for jobId when job completes.
        return new ConcurrentHashMap<>();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public SomeNonSingletonNeedingDistinctRequestAndReplyChannels chunker() {
        MessagingTemplate template = template();
        final PollableChannel replyChannel = replyChannel();
        SomeNonSingletonNeedingDistinctRequestAndReplyChannels bean =
                new SomeNonSingletonNeedingDistinctRequestAndReplyChannels(template, replyChannel);
        AbstractSubscribableChannel requestChannel = (AbstractSubscribableChannel) template.getDefaultDestination();
        requestChannel.addInterceptor(new ChannelInterceptorAdapter() {

            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                registry().putIfAbsent(((ChunkRequest<?>) message.getPayload()).getJobId(), replyChannel);
                return message;
            }

        });
        BridgeHandler bridge = bridge();
        requestChannel.subscribe(bridge);
        return bean;
    }


    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MessagingTemplate template() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requestChannel());
        return messagingTemplate;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public DirectChannel requestChannel() {
        return new DirectChannel();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public PollableChannel replyChannel() {
        return new QueueChannel();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public BridgeHandler bridge() {
        BridgeHandler bridgeHandler = new BridgeHandler();
        bridgeHandler.setOutputChannel(outboundChannel());
        return bridgeHandler;
    }

    @Bean
    public DirectChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel masterReplyChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "outboundChannel")
    public void simulateJmsChannelAdapterPair(ChunkRequest<?> request) {
        masterReplyChannel()
                .send(new GenericMessage<>(new ChunkResponse(request.getSequence(), request.getJobId(), null)));
    }

    @Router(inputChannel = "masterReplyChannel")
    public MessageChannel route(ChunkResponse reply) {
        // TODO: error checking - missing reply channel for jobId
        return registry().get(reply.getJobId());
    }

    public static class SomeNonSingletonNeedingDistinctRequestAndReplyChannels {

        private final static AtomicLong jobIds = new AtomicLong();

        private final long jobId = jobIds.incrementAndGet();

        private final MessagingTemplate template;

        private final PollableChannel replyChannel;

        public SomeNonSingletonNeedingDistinctRequestAndReplyChannels(MessagingTemplate template,
                PollableChannel replyChannel) {
            this.template = template;
            this.replyChannel = replyChannel;
        }

        public void sendSome() {
            ChunkRequest<String> cr = new ChunkRequest<>(0, Collections.singleton("foo"), this.jobId, null);
            this.template.send(new GenericMessage<>(cr));
        }

        public ChunkResponse getResults() {
            @SuppressWarnings("unchecked")
            Message<ChunkResponse> received = (Message<ChunkResponse>) this.replyChannel.receive(10_000);
            if (received != null) {
                if (received.getPayload().getJobId().equals(this.jobId)) {
                    System.out.println("Got the right one");
                }
                else {
                    System.out.println(
                            "Got the wrong one " + received.getPayload().getJobId() + " instead of " + this.jobId);
                }
                return received.getPayload();
            }
            return null;
        }

    }

}

Upvotes: 1

Related Questions