Reputation: 13
I am running 2 services: one which connects to the DB and sends data to msg broker, and one which should take a message from rabbit and send it via batch to targetDB. I have the same RabbitConfiguration in each service but for some reason I am getting:
org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate.
at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
2020-03-01 19:50:36.157 INFO 1748 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 80ms
2020-03-01 19:50:36.183 INFO 1748 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importClientJob]] completed with the following parameters: [{run.id=19}] and the following status: [FAILED] in 142ms
Configuration Class:
@Configuration
public class RabbitConfiguration {
public static final String MESSAGE_EXCHANGE = "clients-exchange";
public static final String MESSAGE_QUEUE = "clients-queue";
public static final String MESSAGE_ROUTING_KEY = "clients.msg";
private final ConnectionFactory connectionFactory;
public RabbitConfiguration(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Bean
Queue queue() {
return new Queue(MESSAGE_QUEUE, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(MESSAGE_EXCHANGE);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(MESSAGE_ROUTING_KEY);
}
@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
rabbitTemplate.setTaskExecutor(taskExecutor);
return rabbitTemplate;
}
}
And BatchConfiguration :
@Configuration
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Value("${pusher.spring-batch-chunk-size}")
private int chunkSize;
private DataSource dataSource;
private RabbitTemplate rabbitTemplate;
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
private ClientPreparedStatementSetter clientPreparedStatementSetter;
@Autowired
public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, DataSource dataSource, RabbitTemplate rabbitTemplate, NamedParameterJdbcTemplate namedParameterJdbcTemplate, ClientPreparedStatementSetter clientPreparedStatementSetter) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.dataSource = dataSource;
this.rabbitTemplate = rabbitTemplate;
this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
this.clientPreparedStatementSetter = clientPreparedStatementSetter;
}
@Bean
public JdbcBatchItemWriter<Client> cursorItemWriter() {
return new JdbcBatchItemWriterBuilder<Client>()
.dataSource(this.dataSource)
.namedParametersJdbcTemplate(namedParameterJdbcTemplate)
.itemPreparedStatementSetter(clientPreparedStatementSetter)
.sql("INSERT INTO CLIENT (id, firstname, lastname, email, phone) VALUES (?,?,?,?,?)")
.build();
}
@Bean
public AmqpItemReader<Client> clientAmqpItemReader() {
return new AmqpItemReaderBuilder<Client>()
.amqpTemplate(rabbitTemplate)
.build();
}
@Bean
public ClientLowerCaseProcessor lowerCaseProcessor() {
return new ClientLowerCaseProcessor();
}
@Bean
public Job importClientJob(Step step1) {
return jobBuilderFactory.get("importClientJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
return stepBuilderFactory.get("step1")
.<Client, Client>chunk(chunkSize)
.reader(clientAmqpItemReader())
.processor(lowerCaseProcessor())
.writer(cursorItemWriter())
.taskExecutor(taskExecutor)
.build();
}
}
I have tried to remove Task executioner and fiddle with config but with no success.
Upvotes: 1
Views: 1271
Reputation: 90517
org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate. at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
AmqpItemReader
in spring-batch uses RabbitTemplate#receive()
to receive messages from RabbitMQ and it requires you to set defaultReceiveQueue
in RabbitTemplate
to specify which queue to receive messages but you miss to configure it. So you have to specify the queue name when configuring the RabbitTemplate
:
@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
rabbitTemplate.setDefaultReceiveQueue("someQueue");
rabbitTemplate.setTaskExecutor(taskExecutor);
return rabbitTemplate;
}
Upvotes: 1