verystrongjoe
verystrongjoe

Reputation: 3911

in the hornetq, the consumer is automatically invoked?

I looked over all of examples in the hornetq, but I couldn't find the example that the consumer is automactically invoked whenever the message comess through the producer.

Please let me know about the example code or hint. thanks in advance.

Upvotes: 1

Views: 1318

Answers (1)

Arya
Arya

Reputation: 3029

Use DefaultMessageListenerContainer. You can register a listener to it and consume messages asynchronously. Follow this link for more information about tuning MessageListenerContainer: http://bsnyderblog.blogspot.se/2010/05/tuning-jms-message-consumption-in.html.

Hornetq dependecies you need (I used a standalone hornetq-2.3.0.CR2) (You also need some spring jars):

<dependencies>
    <!-- hornetq -->
    <dependency>
        <groupId>org.jboss.netty</groupId>
        <artifactId>netty</artifactId>
        <version>3.2.7.Final</version>
    </dependency>
    <dependency>
        <groupId>org.hornetq</groupId>
        <artifactId>hornetq-jms-client</artifactId>
        <version>2.3.0.CR2</version>
    </dependency>
    <dependency>
        <groupId>org.hornetq</groupId>
        <artifactId>hornetq-core-client</artifactId>
        <version>2.3.0.CR2</version>
    </dependency>
    <!-- hornetq -->
</dependencies>

The beans you should use in your applicationContext.xml (I didn't use jndi for getting ConnectionFactory and destinations; For this, you can follow this question):

<!-- It's ConnectionFactory to connect to hornetq. 5445 is hornetq acceptor port  -->
<bean name="connectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory"> 
    <constructor-arg index="0" name="ha" value="false" />
    <constructor-arg index="1" name="commaSepratedServerUrls" value="127.0.0.1:5445" /> 
</bean>

<bean id="destinationParent" class="messaging.jms.JmsDestinationFactoryBean" abstract="true">
    <property name="pubSubDomain" value="false" /> <!-- default is queue -->
</bean>

<bean id="exampleDestination" parent="destinationParent">
    <property name="destinationName" value="example" /> <!-- queue name -->
</bean>

 <!-- MessageListener -->
<bean id="messageHandler" class="messaging.consumer.MessageHandler">
</bean>

<!-- MessageListenerContainer -->
    <bean id="paymentListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="destination"       ref="exampleDestination" />
        <property name="messageListener"   ref="messageHandler" />
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="concurrentConsumers"    value="1" />
        <property name="maxConcurrentConsumers" value="10" />
        <property name="idleConsumerLimit"      value="2" />
        <property name="idleTaskExecutionLimit" value="5" />
        <property name="receiveTimeout"         value="3000" />
    </bean>

CustomHornetQJMSConnectionFactory:

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
    private static final long serialVersionUID = 1L;

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
    {
        super(ha, converToTransportConfigurations(commaSepratedServerUrls));
    }

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
    {   
        String [] serverUrls = commaSepratedServerUrls.split(",");
        TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
        for(int i = 0; i < serverUrls.length; i++)
        {
            String[] urlParts = serverUrls[i].split(":");
            HashMap<String, Object> map = new HashMap<String,Object>();
            map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
            map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
            transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
        }
        return transportconfigurations;
    }
}

JmsDestinationFactoryBean (Used in destinationParent bean):

public class JmsDestinationFactoryBean implements FactoryBean<Destination>
{
    private String destinationName;
    private boolean pubSubDomain = false;

    public void setDestinationName(String destinationName) {
        this.destinationName = destinationName;
    }

    public void setPubSubDomain(boolean pubSubDomain) {
        this.pubSubDomain = pubSubDomain;
    }

    @Override
    public Class<?> getObjectType() 
    {
        return Destination.class;
    }

    @Override
    public boolean isSingleton() 
    {
        return true;
    }

    @Override
    public Destination getObject() throws Exception 
    {
        if(pubSubDomain)
        {
            return HornetQJMSClient.createTopic(destinationName);
        }
        else
        {           
            return HornetQJMSClient.createQueue(destinationName);           
        }
    }
}

MessageHandler (Received messages go to onMessage method for process) (For simplicity, You can implement javax.jms.MessageListener instead of SessionAwareMessageListener):

public class MessageHandler implements org.springframework.jms.listener.SessionAwareMessageListener<Message>
{
@Override
public void onMessage(Message msg, Session session) throws JMSException 
{
    if(msg instanceof TextMessage)
    {
        System.out.println(((TextMessage)msg).getText());
        session.commit();
    }
    else
    {
        session.rollback(); // send message back to the queue
    }
}

Upvotes: 2

Related Questions