user3465651
user3465651

Reputation: 764

struggling with spring integration dsl

I am trying to setup a small spring boot application with spring integration. All it needs to do is pull a message off a jms queue, unmarshall the request back to an object and route to a particular bean to persist. I have tested the routing portion and I can confirm it does work.

I have in my test an embedded activemq broker which I am able to send a message via the spring JmsTemplate but it does not appear to unmarshal the xml payload and route the message. I can see this in the log:

16:42:09.285 [main] INFO  c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 }
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>}
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332
16:42:09.314 [main] INFO  c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent

Spring integration logging:

13:43:48.996 [main] INFO  o.s.i.j.JmsMessageDrivenEndpoint - started org.springframework.integration.jms.JmsMessageDrivenEndpoint@71904469
13:43:48.996 [main] INFO  o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'
13:43:48.996 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry]
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer - 

I'm not sure what I have left out or misconfigured:

Test case:

@Test
public void jmsIntegrationTest() {
    RouteLogEvent log = new RouteLogEvent();
    log.setAgentId(8888);
    log.setInteracitonId(95634);
    log.setMax(5);
    log.setQueueTime(1256L);
    log.setRouteTime(96541L);
    log.setScore(8);

    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500);

    marshaller.marshal(log, new StreamResult(bytesOut));

    final String xmlPayload = new String(bytesOut.toByteArray());

    LOG.info("Sending message");

    jmsTemplate.send(jmsQueue, (s) -> {
        return s.createTextMessage(xmlPayload);
    });

    LOG.info("Message sent");

    List<RouteLogEvent> events = testDao.findAllRouteLogs();
    assertNotNull(events);
    assertFalse(events.isEmpty());

    List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList());
    assertNotNull(filtered);
    assertFalse(filtered.isEmpty());
}

Spring integration configuration:

@SpringBootApplication
@EnableIntegration
public class VitelAsyncPersisterApplication {

    private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>();

    private static final String CHANNEL_RECORDING = "channel-recording";
    private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state";
    private static final String CHANNEL_AGENT_STATE = "channel-agent-state";
    private static final String CHANNEL_ROUTE_LOG = "channel-route";

    static {
        ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE);
        ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE);
        ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING);
        ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG);
    }

    @Value("${jms.queue.entity.persist}")
    private String jmsQueueName;

    @Value("${jms.broker.url}")
    private String jmsBrokerUrl;

    @Autowired
    private EventDao eventDao;

    @Bean
    public Jaxb2Marshaller xmlMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setSchema(new ClassPathResource("entities.xsd"));
        marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities");

        return marshaller;
    }

    @Bean
    public ConnectionFactory jmsConnFactory() {
        ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl);

        CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory();
        cachingConnFactory.setTargetConnectionFactory(activeMq);

        return cachingConnFactory;
    }

    @Bean
    public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) {
        UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller);

        JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName);

        return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get();
    }

    @Bean
    public IntegrationFlow buildRouterFlow() {

        Function router = (p) -> {
            if (ROUTING_EVENTS.containsKey(p.getClass())) {
                return ROUTING_EVENTS.get(p.getClass());
            } else {
                return null;
            }
        };

        return IntegrationFlows.from("msg.router").route(router, m -> m
                .subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload())))
                .subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload())))
                .subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload())))
                .subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get();
    }

    public static void main(String[] args) {
        SpringApplication.run(VitelAsyncPersisterApplication.class, args);
    }
}

Upvotes: 1

Views: 898

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

According to your logs I really don't see any Spring Integration infrastructure.

So, maybe you have just missed @EnableIntegration there?

Plus your test is a bit strange. It sends message to the JMS and check the result from the DB. But we don't see there how you start that config for Integration.

Since you are going only listen messages and store them in the DB, consider to use one-way JMS component - Jms.messageDriverChannelAdapter(). Instead of request/reply gateway.

Upvotes: 1

Related Questions