Reputation: 715
I cannot manage to code a server that would listen to STOMP messages over WebSocket. My issue lies with the stomp protocol and the creation of the JMS consumers.
The following code fails on createConnection
class StompDemo {
val uri = "ws://localhost:61614"
val topicName = "mytopic"
val broker = new BrokerService
broker.addConnector(uri)
val topic = new ActiveMQTopic(topicName)
val topics = Array[ActiveMQDestination](topic)
broker.setDestinations(topics)
broker.start
println("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
println("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createTopic(topicName)
val consumer = session.createConsumer(destination)
println("Created consumer")
while(true) {
println("Waiting for next message")
val message = consumer.receive
}
}
with the following exception:
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
Can you please point out the issue with this code? How to configure programmatically a JMS listener to a queue or topic over WebSocket/Stomp with AMQ?
Thanks
New updated code failing with ActiveMQ Transport: tcp:///127.0.0.1:51309@6969 [] Transport Connection to: tcp://127.0.0.1:51309 failed: java.io.IOException: Unknown data type: 47 I guess it has to be related to the binary vs. text based.
Still investigating why this fails:
package org.tj.amq
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
//
// http://www.massapi.com/class/br/BrokerService.html
//
object AMQStompDemo extends MainLoop with Logging {
<<("AMQ Stomp Demo")
val uri = "tcp://localhost:6969"
val broker = new BrokerService
broker.setPersistent(false)
broker.setUseJmx(false)
broker.addConnector(uri)
broker.start
<<("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
connection.start
<<("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createQueue("test")
val consumer = session.createConsumer(destination)
while(true) {
<<("Ready to receive next message ...")
val message = consumer.receive
message match {
case tm:TextMessage => <<(s"Received text message ${tm.getText}")
case _ => <<(s"Received another message type $message")
}
}
def main(args: Array[String]): Unit = {}
}
trait Logging {
def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}
trait MainLoop extends Logging {
new Thread(new Runnable() {
override def run = {
<<("Starting main loop")
while(true) {
Thread.sleep(1000)
}
}
}).start
}
The saga continues.
By just adding broker.addConnector("ws://localhost:6971")
I can successfully connect via WS from the browser to the queue /queue/test
Now, last remaining issue - I do get callbacks, but AMQ gives me this
[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Just after the first message received.
[EDITED] Well, I have been hit by https://issues.apache.org/jira/browse/AMQ-5155 So, using AMQ version 5.9.0 works.
My feeling is that AMQ for WebSockets is too flaky. Well probably used a more conservative approach with Tomcat instead.
Upvotes: 0
Views: 6010
Reputation: 18366
Generally you wouldn't be using websockets on the server side, just connect using a normal STOMP or OpenWire connection.
That siad, looking at your code you appear to be using the ActiveMQ JMS client which speaks neither STOMP nor Websockets so you are doomed to failure. The ActiveMQ JMS client uses the OpenWire protocol and can connect via TCP or SSL (HTTP can work to with the right jar).
Upvotes: 1