b15
b15

Reputation: 2361

ActiveMQ Artemis and 5.x listeners at same time - NullPointerException

I have a legacy Spring 4.2.1.RELEASE application that connects to ActiveMQ 5.x as a listener and now we're adding connectivity to ActiveMQ Artemis. For Artemis we're using durable subscriptions because we don't want message loss on a topic when the subscribers go down and shared subscriptions because we wanted the option of clustering or using concurrency to asynchronously process the messages in the subscription. I have separate ConnectionFactorys and ListenerContainers, but from this WARN log that keeps repeating it looks like the Artemis DMLC can't start due to the following NPE:

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

On the surface it looks like it can't find the method createSharedDurableConsumer. Looking at the AbstractMessageListenerContainer I have, line 856 is calling method.invoke

/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
        Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);

...

Method method = (isSubscriptionDurable() ?
                        createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
    return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}

Artemis configuration:

@Configuration
public class ArtemisConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory artemisConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
                .createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());

        artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
        artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
        artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
        artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
        artemisConnectionFactory
                .setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
        artemisConnectionFactory.setInitialConnectAttempts(
                env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
        artemisConnectionFactory
                .setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
        artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
        artemisConnectionFactory
                .setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
        artemisConnectionFactory.setBlockOnAcknowledge(true);
        artemisConnectionFactory.setBlockOnDurableSend(true);
        artemisConnectionFactory.setCacheDestinations(true);
        artemisConnectionFactory.setConsumerWindowSize(0);
        artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

        cachingConnectionFactory
        .setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
        cachingConnectionFactory.setReconnectOnException(true);

        return cachingConnectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
            JmsTransactionManager artemisJmsTransactionManager,
            MappingJackson2MessageConverter mappingJackson2MessageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setConnectionFactory(artemisConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setMessageConverter(mappingJackson2MessageConverter);
        factory.setSubscriptionDurable(Boolean.TRUE);
        factory.setSubscriptionShared(Boolean.TRUE);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(artemisJmsTransactionManager);

        return factory;
    }

    private TransportConfiguration[] createTransportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
        String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
        Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);

        primaryTransportParameters.put("host", primaryHostname);
        primaryTransportParameters.put("port", primaryPort);

        return new TransportConfiguration[] {
                new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
                new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
    }
}

My pom uses version 2.10.0 of Artemis.

How do I fix this?

Upvotes: 0

Views: 436

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35152

The JMS 2.0 spec is backwards compatible with JMS 1.1 so make sure you only have the JMS 2 spec on your classpath. My hunch is that the reflection calls in the Spring code are getting messed up because they're hitting the JMS 1.1 spec classes instead of the proper JMS 2 spec classes.

Upvotes: 2

Related Questions