Reputation: 83
I have simple task executor in my spring-config.xml and couple of chains attached
<channel id="inputChannel" />
<task:executor id="threadPoolExecutor" pool-size="2" />
<publish-subscribe-channel id="multiCastChannel"
task-executor="threadPoolExecutor" />
<chain input-channel="inputChannel"
output-channel="multiCastChannel">
<json-to-object-transformer
type="com.company.integration.domain.DomainObject" />
<service-activator ref="validator"
method="validate" />
</chain>
<chain input-channel="multiCastChannel"
output-channel="inventoryAdjustmentOutputChannelOne">
<service-activator ref="adapterOne"
method="buildOutputMessageOne" />
</chain>
<chain input-channel="multiCastChannel"
output-channel="inventoryAdjustmentOutputChannelTwo">
<service-activator ref="adapterTwo"
method="buildOutputMessageTwo" />
</chain>
When a message posted to "inputChannel" and after processing and sent to "multuCastChannel" there are two threads created without issues like
threadPoolExecutor-1 threadPoolExecutor-2
And these two are created only once per input message which is fine. But when I am trying to test the same with JUnit... each "multiCastChannel" chains are executing twice...means my service activators (adapterOne,adapterTwo) in the chains are calling twice per chain ...which is wired..
Any idea why JUnit has this behavior ?
Below is the Junit piece of code
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:configuration/spring-config.xml"})
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@PropertySource("classpath:application.properties")
@SuppressWarnings("unchecked")
@WebAppConfiguration
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class InventoryAdjustmentMessageTest {
@Autowired
private DirectChannel inputChannel;
@Test
public void testTaskShed()
throws IOException, InterruptedException, JMSException {
String validInput = setup("valid-message.txt");
Message<String> inputMessage = TextMessageUtil.createNewGenericMessage(validInput);
inputChannel.send(inputMessage);
Thread.sleep(5000);
Spring integration version : 4.1.6
Adding application config information:
@Import({ HarnessConfiguration.class, LocalConfiguration.class, MongoDbConfiguration.class, WebConfiguration.class,
WebsphereMQJMSConfiguration.class })
@Configuration
@EnableWebMvc
@ComponentScan(basePackages = { "com.abc.inventory.adjustment.integration.service",
"com.abc.inventory.adjustment.integration.domain" }, useDefaultFilters = false, excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = ApplicationConfiguration.class) }, includeFilters = {
@ComponentScan.Filter(type = FilterType.ANNOTATION, value = { Controller.class,
Component.class }) })
@ImportResource({ "classpath:configuration/spring-config.xml", "classpath:configuration/spring-adapters.xml" })
@EnableMongoRepositories("com.abc.inventory.adjustment.integration.service.audit.repository")
@EnableAutoConfiguration
@EnableMongoAuditing
public class ApplicationConfiguration extends WebMvcConfigurerAdapter {
//
-Tej
Upvotes: 2
Views: 1496
Reputation: 121427
Here you can find the simple test-case to demonstrate the proper behavior:
<task:executor id="executor" pool-size="2"/>
<publish-subscribe-channel id="pubSubChannel" task-executor="executor" />
<service-activator input-channel="pubSubChannel" expression="T(System).out.println(payload)"/>
<service-activator input-channel="pubSubChannel" expression="T(System).out.println('foo: ' + payload)"/>
@Test
public void testPubSubChannel() throws InterruptedException {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext("pubSubChannelConfig.xml", getClass());
MessageChannel channel = (MessageChannel) context.getBean("pubSubChannel");
for (int i = 0; i < 10; i++) {
channel.send(new GenericMessage<Integer>(i));
}
Thread.sleep(10000);
context.close();
}
The result looks like:
foo: 0
0
foo: 1
1
2
foo: 2
3
foo: 3
4
foo: 4
5
foo: 5
6
foo: 6
7
foo: 7
8
foo: 8
9
foo: 9
Upvotes: 1