Andrea
Andrea

Reputation: 31

Quarkus IBM MQ extension

I'm trying to create an extension using quarkus in order to use ibm mq as for a native executable. Until now I've created (in the runtime module) the ConnectionFactory producer:

@ApplicationScoped
public class ConnectionFactoryProducer {

    @Produces
    @ApplicationScoped
    @DefaultBean
    public JmsConnectionFactory connectionFactory() throws JMSException {

        JmsFactoryFactory ff;
        JmsConnectionFactory factory;
        ff = JmsFactoryFactory.getInstance(JmsConstants.WMQ_PROVIDER);
        factory = ff.createConnectionFactory();
        // Always work in TCP/IP client mode
        factory.setIntProperty(CommonConstants.WMQ_CONNECTION_MODE, CommonConstants.WMQ_CM_CLIENT);
        factory.setStringProperty(CommonConstants.WMQ_HOST_NAME, "localhost");
        factory.setIntProperty(CommonConstants.WMQ_PORT, 1414);
        factory.setStringProperty(CommonConstants.WMQ_CHANNEL, "DEV.ADMIN.SVRCONN");
        factory.setStringProperty(CommonConstants.WMQ_QUEUE_MANAGER, "QM1");
        factory.setStringProperty(WMQConstants.USERID, "admin");
        factory.setStringProperty(WMQConstants.PASSWORD, "passw0rd");
        return factory;
    }

}

The deployment module contains the processor:

public final class IbmExtProcessor {

    private static final String FEATURE = "ibm-ext";

    @BuildStep
    FeatureBuildItem feature() {
        return new FeatureBuildItem(FEATURE);
    }

}

Where FEATURE is the name of the extension.

But when I try to execute the code using the extension by importing it as a dependency in my project nothing happens. It looks like classes that use the dependency are no more in the application context. Example of a message producer:

public class NumberProducer implements Runnable {

    private final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    @Inject
    private ConnectionFactoryProducer cf;
    private ConnectionFactory c;



    void onStart(@Observes StartupEvent ev) throws JMSException {
        c=cf.connectionFactory();
        scheduler.scheduleWithFixedDelay(this, 0L, 1L, TimeUnit.SECONDS);
    }

    void onStop(@Observes ShutdownEvent ev) {
        scheduler.shutdown();
    }

    @Override
    public void run() {
        JMSContext context = c.createContext();
        Queue destination=context.createQueue("queue:///DEV.QUEUE.1");
        try {
            TextMessage message = context.createTextMessage(String.format("Value : %d", random.nextInt(100)));
            JMSProducer producer = context.createProducer();
            producer.send(destination, message);
            System.out.println(message);
        } catch (Exception e) {
            throw e;
        }
    }
}

In this case i will never get the print of the message variable. Anyone can help? I think i've missed something in the extension but i can't figure out what could be.

Upvotes: 3

Views: 1902

Answers (3)

Serkan
Serkan

Reputation: 1205

I've made it work by using Smallrye Reactive JMS, see code below:

pom.xml:

        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-arc</artifactId>
        </dependency>

        <!-- you also need to include amqp, otherwise you get runtime errors -->
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-jms</artifactId>
            <version>4.12.0</version>
        </dependency>

        <!-- this one uses Jakarta JMS (instead of javax) -->
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.jakarta.client</artifactId>
            <version>9.3.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.15.2</version>
        </dependency>

Define your JmsConnectionFactory:

@ApplicationScoped
public class JmsConfig {

    // my own config class with mq config settings
    @Inject
    MqProperties config;

    @Produces
    public ConnectionFactory connectionFactory() throws JMSException {
        var result = JmsFactoryFactory.getInstance(JmsConstants.JAKARTA_WMQ_PROVIDER).createConnectionFactory();
        result.setIntProperty(WMQ_CONNECTION_MODE, WMQ_CM_CLIENT);
        result.setStringProperty(WMQ_QUEUE_MANAGER, this.config.queueManager());
        result.setStringProperty(WMQ_CHANNEL, this.config.channel());
        result.setStringProperty(WMQ_CONNECTION_NAME_LIST, this.config.connectionName());
        result.setStringProperty(WMQ_APPLICATIONNAME, this.config.applicationName());
        result.setIntProperty(WMQ_CLIENT_RECONNECT_TIMEOUT, this.config.timeout());
        result.setBooleanProperty(USER_AUTHENTICATION_MQCSP, false);
        return result;
    }
}

Config class:

@ConfigMapping(prefix = "ibm.mq", namingStrategy = VERBATIM)
public interface MqProperties {
    String queueManager();

    String channel();

    String connectionName();

    String applicationName();

    int timeout();
}

application.properties:

ibm.mq.queueManager=<mq queue manager name>
ibm.mq.channel=<mq channel name>
ibm.mq.connectionName=localhost(1414)
ibm.mq.applicationName=my-jms-app
ibm.mq.timeout=5000

mp.messaging.incoming.order.connector=smallrye-jms
mp.messaging.incoming.order.destination=<your queue name>
mp.messaging.incoming.order.username=...
mp.messaging.incoming.order.password=...

quarkus.amqp.devservices.enabled=false

And use it:

@ApplicationScoped
public class OrderReceiver {

    @Incoming("order")
    public CompletionStage<Void> onMessage(Message<Order> message) {
        Log.infof("incoming order: \n%s", message.getPayload());
        return message.ack();
    }
}

In my case, incoming payload was xml, so I had to register a converter for it:

@ApplicationScoped
public class OrderConverter implements MessageConverter {

     // jackson xml mapper
    XmlMapper mapper = new XmlMapper();

    @Override
    public boolean canConvert(Message<?> message, Type type) {
        return message.getPayload().getClass().equals(String.class) && type.equals(Order.class);
    }

    @Override
    public Message<?> convert(Message<?> message, Type type) {
        try {
            return message.withPayload(this.mapper.readValue((String) message.getPayload(), Order.class));
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public int getPriority() {
        return 200;
    }
}

I hope this helps.

Upvotes: 3

Ealse de Wilde
Ealse de Wilde

Reputation: 21

AMQP will not work when you have to connect to a classic IBM MQ channel. It that cases you have to use the com.ibm.mq.allclient jar. Despite the introspection and thread spawning it is possible to use the com.ibm.mq.allclient in a Quarkus native image. You will need the following quarkus.native.additional-build-args:

-J-Dcom.ibm.mq.cfg.useIBMCipherMappings=false,\
-J-Dcom.ibm.msg.client.commonservices.ffst.suppress=-1,\
--initialize-at-run-time=org.apache.http.impl.auth.NTLMEngineImpl\,com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread\,com.ibm.mq.jmqi.JmqiDefaultThreadPool\,com.ibm.msg.client.wmq.compat.jms.internal.MQJMSMessage\,com.ibm.msg.client.commonservices.workqueue.WorkQueueManager,\
-H:ResourceConfigurationFiles=resources-config.json,\
-H:ReflectionConfigurationFiles=reflection-config.json,\
-H:+ReportUnsupportedElementsAtRuntime,\
--report-unsupported-elements-at-runtime

The first system property is needed when using a non IBM JDK.
I've added the second system property to avoid FFDC logging; (which will try to write to a local filesystem). The first class mentioned initialize-at-run-time stuff is because of the use of a SecureRandom.
The rest is for the two class loading paths to the WorkQueueManager. The WorkQueueManager itself is needed because it spawns Threads at runtime.

Regarding reflection you have to define a reflection-config.json for the classes

  1. com.ibm.msg.client.jms.internal.JmsFactoryFactoryImpl
  2. com.ibm.mq.jmqi.remote.api.RemoteFAP
  3. com.sun.security.cert.internal.x509.X509V1CertImpl

The last class is needed when you use SSL/TLS in your connection to the MQ channel.

Also a resources-config.json is needed with {"pattern": ".+properties$"} because the IBM MQ code uses ResourceBundles with a number of properties files for error messages.

When using SSL/TLS you have to explicitly set the SSLSocketfactory on the JmsConnectionFactory:

connectionFactory.setObjectProperty("XMSC_WMQ_SSL_SOCKET_FACTORY", sslSocketFactory);

Depending on how you configure the JmsConnectionFactory and the version of the com.ibm.mq.allclient jar you might get errors on "XMSC_WMQ_HOST_NAME" configuration. These errors are bugs in the IBM-code and have nothing to do with the native image.

Upvotes: 2

chughts
chughts

Reputation: 4737

This approach is not going to work because the MQ JMS client uses dynamic loading and introspection to instantiate the appropriate implementation classes. These classes get excluded from the native runtime. You can run on the Quarkus JVM, but native is a problem.

An alternative is to make use of one of the Qpid JMS AMQP client - https://quarkus.io/guides/jms , but you will need to ensure that the AMQP protocol is switched on in your MQ server.

Upvotes: 1

Related Questions