jts
jts

Reputation: 715

ActiveMQ, WebSocket and Stomp

I cannot manage to code a server that would listen to STOMP messages over WebSocket. My issue lies with the stomp protocol and the creation of the JMS consumers.

The following code fails on createConnection

class StompDemo {
  val uri = "ws://localhost:61614"
  val topicName = "mytopic"
  val broker = new BrokerService
  broker.addConnector(uri)
  val topic = new ActiveMQTopic(topicName)
  val topics = Array[ActiveMQDestination](topic)
  broker.setDestinations(topics)
  broker.start
  println("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  println("Started connection")

  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createTopic(topicName)
  val consumer = session.createConsumer(destination)
  println("Created consumer")

  while(true) {
    println("Waiting for next message")
    val message = consumer.receive
  }
}

with the following exception:

Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!

Can you please point out the issue with this code? How to configure programmatically a JMS listener to a queue or topic over WebSocket/Stomp with AMQ?

Thanks

New updated code failing with ActiveMQ Transport: tcp:///127.0.0.1:51309@6969 [] Transport Connection to: tcp://127.0.0.1:51309 failed: java.io.IOException: Unknown data type: 47 I guess it has to be related to the binary vs. text based.

Still investigating why this fails:

package org.tj.amq

import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage

//
// http://www.massapi.com/class/br/BrokerService.html
//

object AMQStompDemo extends MainLoop with Logging {
  <<("AMQ Stomp Demo")
  val uri = "tcp://localhost:6969"
  val broker = new BrokerService
  broker.setPersistent(false)
  broker.setUseJmx(false)
  broker.addConnector(uri)
  broker.start
  <<("Started broker")

  val connectionFactory = new ActiveMQConnectionFactory(uri)
  val connection = connectionFactory.createConnection
  connection.start
  <<("Started connection")
  val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
  val destination = session.createQueue("test")
  val consumer = session.createConsumer(destination)

  while(true) {
    <<("Ready to receive next message ...")
    val message = consumer.receive
    message match {
      case tm:TextMessage => <<(s"Received text message ${tm.getText}")
      case _ => <<(s"Received another message type $message")
    }
  }

  def main(args: Array[String]): Unit = {}
}

trait Logging {
  def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}

trait MainLoop extends Logging {
  new Thread(new Runnable() {
    override def run = {
      <<("Starting main loop")
      while(true) {
        Thread.sleep(1000)
      }
    }
  }).start
}

The saga continues. By just adding broker.addConnector("ws://localhost:6971") I can successfully connect via WS from the browser to the queue /queue/test

Now, last remaining issue - I do get callbacks, but AMQ gives me this

[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
    at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Just after the first message received.

[EDITED] Well, I have been hit by https://issues.apache.org/jira/browse/AMQ-5155 So, using AMQ version 5.9.0 works.

My feeling is that AMQ for WebSockets is too flaky. Well probably used a more conservative approach with Tomcat instead.

Upvotes: 0

Views: 6010

Answers (1)

Tim Bish
Tim Bish

Reputation: 18366

Generally you wouldn't be using websockets on the server side, just connect using a normal STOMP or OpenWire connection.

That siad, looking at your code you appear to be using the ActiveMQ JMS client which speaks neither STOMP nor Websockets so you are doomed to failure. The ActiveMQ JMS client uses the OpenWire protocol and can connect via TCP or SSL (HTTP can work to with the right jar).

Upvotes: 1

Related Questions