Reputation: 2288
i have the following configuration
<int:channel id="notificationChannel" datatype="com.mycompany.integration.NotificationMessage">
<int:queue message-store="jdbc-message-store" capacity="1000" />
</int:channel>
<int:outbound-channel-adapter ref="notificationHandler"
method="handle" channel="notificationChannel" >
<int:poller max-messages-per-poll="100" fixed-delay="60000"
time-unit="MILLISECONDS" >
<int:transactional isolation="DEFAULT" />
</int:poller>
</int:outbound-channel-adapter>
now i want to unit-test this, i need to wait for the message being processed correctly in the test, i tried it with an interceptor but that doesn't work because i could only sync on message delivery but not on successful processing of the message. implement sending a reply when the procesing is done but this would mean that would implement this only to make my unit-test work, in production there wouldn't be a replyChannel set in the message-header. how can i realize syncing on successful processing of the request without implementing it in the messageHandler?
Upvotes: 3
Views: 3952
Reputation: 174729
If you are using Spring Integration 2.2.x, you can do this with an advice...
public class CompletionAdvice extends AbstractRequestHandlerAdvice {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
Object result = callback.execute();
latch.countDown();
return result;
}
public CountDownLatch getLatch() {
return latch;
}
}
In your test environment, add the advice to the adapter's handler with a bean factory post processor.
public class AddCompletionAdvice implements BeanFactoryPostProcessor {
private final Collection<String> handlers;
private final Collection<String> replyProducingHandlers;
public AddCompletionAdvice(Collection<String> handlers, Collection<String> replyProducingHandlers) {
this.handlers = handlers;
this.replyProducingHandlers = replyProducingHandlers;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (String beanName : handlers) {
defineAdviceAndInject(beanFactory, beanName, beanName + "CompletionAdvice");
}
for (String beanName : replyProducingHandlers) {
String handlerBeanName = beanFactory.getAliases(beanName + ".handler")[0];
defineAdviceAndInject(beanFactory, handlerBeanName, beanName + "CompletionAdvice");
}
}
private void defineAdviceAndInject(ConfigurableListableBeanFactory beanFactory, String beanName, String adviceBeanName) {
BeanDefinition serviceHandler = beanFactory.getBeanDefinition(beanName);
BeanDefinition advice = new RootBeanDefinition(CompletionAdvice.class);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(adviceBeanName, advice);
serviceHandler.getPropertyValues().add("adviceChain", new RuntimeBeanReference(adviceBeanName));
}
}
Add the post processor to the config <bean class="foo.AddCompletionAdvice" />
.
Finally, inject the advice(s) into your test case
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class TestAdvice {
@Autowired
private CompletionAdvice fooCompletionAdvice;
@Autowired
private CompletionAdvice barCompletionAdvice;
@Autowired
private MessageChannel input;
@Test
public void test() throws Exception {
Message<?> message = new GenericMessage<String>("Hello, world!");
input.send(message);
assertTrue(fooCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
assertTrue(barCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
}
}
and wait for the latch(es).
<int:publish-subscribe-channel id="input"/>
<int:outbound-channel-adapter id="foo" channel="input" ref="x" method="handle"/>
<int:service-activator id="bar" input-channel="input" ref="x"/>
<bean class="foo.AddCompletionAdvice">
<constructor-arg name="handlers">
<list>
<value>foo</value>
</list>
</constructor-arg>
<constructor-arg name="replyProducingHandlers">
<list>
<value>bar</value>
</list>
</constructor-arg>
</bean>
<bean id="x" class="foo.Foo" />
I added these classes to a Gist
EDIT: Updated to provide a general case for ultimate consumers (no reply) and reply producing consumers.
Upvotes: 3