user3444718
user3444718

Reputation: 1625

in memory amqp for dev and testing

I am using rabbit in my services, but for restrictions I cant download on local. For that I want to use in memory broker and figured qpic can work. I have below configuration and in logs I can see qpid broker starts fine but when spring boot tries to send message it cant connect.

   @Bean
    Broker broker() throws Exception {
        org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        BrokerOptions brokerOptions = new BrokerOptions();
        brokerOptions.setConfigProperty("qpid.amqp_port", "5672");
        brokerOptions.setConfigProperty("qpid.broker.defaultPreferenceStoreAttributes", "{\"type\": \"Noop\"}");
        brokerOptions.setConfigProperty("qpid.vhost", "/");
        brokerOptions.setConfigurationStoreType("Memory");
        brokerOptions.setStartupLoggedToSystemOut(false);
        broker.startup(brokerOptions);
        return broker;
    }

in resource I have initial-config as below:

{
  "name": "Embedded Test Broker",
  "modelVersion": "6.1",
  "authenticationproviders" : [{
    "name": "password",
    "type": "Plain",
    "secureOnlyMechanisms": [],
    "users": [{"name": "guest", "password": "guest", "type": "managed"}]
  }],
  "ports": [{
    "name": "AMQP",
    "port": "${qpid.amqp_port}",
    "authenticationProvider": "password",
    "protocols": [ "AMQP_0_9_1" ],
    "transports": [ "TCP" ],
    "virtualhostaliases": [{
      "name": "${qpid.vhost}",
      "type": "nameAlias"
    }]
  }],
  "virtualhostnodes" : [{
    "name": "${qpid.vhost}",
    "type": "Memory",
    "virtualHostInitialConfiguration": "{ \"type\": \"Memory\" }"
  }]
}

Error I am getting

2018-08-15 19:55:07 CachingConnectionFactory - Attempting to connect to: [localhost:5672]
2018-08-15 19:55:07 NonBlockingConnection - Identified transport encryption as NONE
2018-08-15 19:55:07 NonBlockingConnection - Read 8 byte(s)
2018-08-15 19:55:07 TaskExecutorImpl - Submitting Task['create' on '/127.0.0.1:56891(?)'] to executor Broker-Config
2018-08-15 19:55:07 TaskExecutorImpl - Performing Task['create' on '/127.0.0.1:56891(?)']
2018-08-15 19:55:07 open - [con:2(/127.0.0.1:56891)] CON-1001 : Open : Destination : AMQP(127.0.0.1:5672) : Protocol Version : 0-9-1
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@3979d0 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@db842 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@4c9750 through executor interface
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ProtocolHeader [AMQP0091 ]
2018-08-15 19:55:07 TaskExecutorImpl - Task['create' on '/127.0.0.1:56891(?)'] performed successfully with result: null
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionStartBodyImpl: versionMajor=0, versionMinor=9, serverProperties={product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}, mechanisms=[80, 76, 65, 73, 78, 32, 67, 82, 65, 77, 45, 77, 68, 53, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 49, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 50, 53, 54], locales=[101, 110, 95, 85, 83]]
2018-08-15 19:55:07 FieldTable - FieldTable::writeToBuffer: Writing encoded length of 308...
2018-08-15 19:55:07 FieldTable - {product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}
2018-08-15 19:55:07 NonBlockingConnection - Written 379 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 443 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionStartOk[ clientProperties: {connection_name=[LONG_STRING: rabbitConnectionFactory#1800efd:1], product=[LONG_STRING: RabbitMQ], copyright=[LONG_STRING: Copyright (c) 2007-2017 Pivotal Software, Inc.], capabilities=[FIELD_TABLE: {exchange_exchange_bindings=[BOOLEAN: true], connection.blocked=[BOOLEAN: true], authentication_failure_close=[BOOLEAN: true], basic.nack=[BOOLEAN: true], publisher_confirms=[BOOLEAN: true], consumer_cancel_notify=[BOOLEAN: true]}], information=[LONG_STRING: Licensed under the MPL. See http://www.rabbitmq.com/], version=[LONG_STRING: 5.1.2], platform=[LONG_STRING: Java]} mechanism: PLAIN response: ******** locale: en_US ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SASL Mechanism selected: PLAIN Locale : en_US
2018-08-15 19:55:07 AMQPConnection_0_8Impl - Connected as: Subject:
    Principal: guest

2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionTuneBodyImpl: channelMax=256, frameMax=262136, heartbeat=0]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 20 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 36 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionTuneOk[ channelMax: 256 frameMax: 262136 heartbeat: 60 ]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 44 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 12 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionCloseOk
2018-08-15 19:55:07 NonBlockingConnection - Closing /127.0.0.1:56891
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 MultiVersionProtocolEngine - Closed
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable org.apache.qpid.server.transport.AbstractAMQPConnection$1@ddf89f through executor interface
2018-08-15 19:55:07 ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readUnsignedByte(Unknown Source)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
    at java.lang.Thread.run(Unknown Source)
2018-08-15 19:55:07 NonBlockingConnection - Identified transport encryption as NONE
2018-08-15 19:55:07 close - [con:2(guest@/127.0.0.1:56891)] CON-1002 : Close
2018-08-15 19:55:07 NonBlockingConnection - Read 8 byte(s)
2018-08-15 19:55:07 TaskExecutorImpl - Submitting Task['create' on '/0:0:0:0:0:0:0:1:56892(?)'] to executor Broker-Config
2018-08-15 19:55:07 TaskExecutorImpl - Performing Task['create' on '/0:0:0:0:0:0:0:1:56892(?)']
2018-08-15 19:55:07 open - [con:3(/0:0:0:0:0:0:0:1:56892)] CON-1001 : Open : Destination : AMQP(0:0:0:0:0:0:0:1:5672) : Protocol Version : 0-9-1
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@146d207 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@1d1fae1 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable com.google.common.util.concurrent.Futures$6@ed29c3 through executor interface
2018-08-15 19:55:07 TaskExecutorImpl - Task['create' on '/0:0:0:0:0:0:0:1:56892(?)'] performed successfully with result: null
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ProtocolHeader [AMQP0091 ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionStartBodyImpl: versionMajor=0, versionMinor=9, serverProperties={product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}, mechanisms=[80, 76, 65, 73, 78, 32, 67, 82, 65, 77, 45, 77, 68, 53, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 49, 32, 83, 67, 82, 65, 77, 45, 83, 72, 65, 45, 50, 53, 54], locales=[101, 110, 95, 85, 83]]
2018-08-15 19:55:07 FieldTable - FieldTable::writeToBuffer: Writing encoded length of 308...
2018-08-15 19:55:07 FieldTable - {product=[LONG_STRING: qpid], version=[LONG_STRING: 6.1.1], qpid.build=[LONG_STRING: 1775107], qpid.instance_name=[LONG_STRING: Embedded Test Broker], qpid.close_when_no_route=[LONG_STRING: true], qpid.message_compression_supported=[LONG_STRING: true], qpid.confirmed_publish_supported=[LONG_STRING: true], qpid.virtualhost_properties_supported=[LONG_STRING: true], qpid.queue_lifetime_supported=[LONG_STRING: true]}
2018-08-15 19:55:07 NonBlockingConnection - Written 379 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 443 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionStartOk[ clientProperties: {connection_name=[LONG_STRING: rabbitConnectionFactory#1800efd:1], product=[LONG_STRING: RabbitMQ], copyright=[LONG_STRING: Copyright (c) 2007-2017 Pivotal Software, Inc.], capabilities=[FIELD_TABLE: {exchange_exchange_bindings=[BOOLEAN: true], connection.blocked=[BOOLEAN: true], authentication_failure_close=[BOOLEAN: true], basic.nack=[BOOLEAN: true], publisher_confirms=[BOOLEAN: true], consumer_cancel_notify=[BOOLEAN: true]}], information=[LONG_STRING: Licensed under the MPL. See http://www.rabbitmq.com/], version=[LONG_STRING: 5.1.2], platform=[LONG_STRING: Java]} mechanism: PLAIN response: ******** locale: en_US ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SASL Mechanism selected: PLAIN Locale : en_US
2018-08-15 19:55:07 AMQPConnection_0_8Impl - Connected as: Subject:
    Principal: guest

2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionTuneBodyImpl: channelMax=256, frameMax=262136, heartbeat=0]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 1 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 20 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 36 byte(s)
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionTuneOk[ channelMax: 256 frameMax: 262136 heartbeat: 60 ]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ]
2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 NonBlockingConnection - Written 44 bytes
2018-08-15 19:55:07 NonBlockingConnection - Read 0 byte(s)
2018-08-15 19:55:07 NonBlockingConnection - Read 12 byte(s)
2018-08-15 19:55:07 RequestServiceImpl - Failure sending message to queue...storing for later delivery {java.io.IOException}
2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionCloseOk
2018-08-15 19:55:07 NonBlockingConnection - Closing /0:0:0:0:0:0:0:1:56892
2018-08-15 19:55:07 BrokerDecoder - Frame handled in 0 ms.
2018-08-15 19:55:07 MultiVersionProtocolEngine - Closed
2018-08-15 19:55:07 ForgivingExceptionHandler - An unexpected connection driver error occured
java.net.SocketException: socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readUnsignedByte(Unknown Source)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
    at java.lang.Thread.run(Unknown Source)
2018-08-15 19:55:07 TaskExecutorImpl - Running runnable org.apache.qpid.server.transport.AbstractAMQPConnection$1@c30ce0 through executor interface
2018-08-15 19:55:07 close - [con:3(guest@null)] CON-1002 : Close
2018-08-15 19:55:20 BrokerImpl - Assigning target sizes based on total target 207591833
2018-08-15 19:55:20 BrokerImpl - Assigning target size 207591833 to vhost VirtualHost[id=556d86e7-d4f7-4428-9a54-86b280bdd515, name=/, type=Memory]
2018-08-15 19:55:20 AbstractVirtualHost - Allocating target size to queues, total target: 207591833 ; total enqueued size 0

Upvotes: 1

Views: 4057

Answers (3)

Taraskin
Taraskin

Reputation: 792

FYI:

Excellent and self-explaining example of RabbitMQ testing using rabbitmq-mock here:

rabbitmq-mock | SpringIntegrationTest.java

Upvotes: 0

Loïc Le Doyen
Loïc Le Doyen

Reputation: 1055

If your need is to mimic RabbitMQ behavior, I would suggest using rabbitmq-mock to avoid the need of a port (which can be an issue in CI runs) and improve startup time.

Simply add the dependency to your build tool, if you are using Maven:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.14</version>
    <scope>test</scope>
</dependency>

Then replace or override your Spring ConnectionFactory with:

@Bean
public ConnectionFactory connectionFactory() {
    return new CachingConnectionFactory(MockConnectionFactoryFactory.build());
}

Both RabbitTemplate and SimpleMessageListenerContainer should behave as if they were connected to a real RabbitMQ broker.

Here is a complete sample test using Spring-Boot.

Upvotes: 5

k-wall
k-wall

Reputation: 419

Your AMQP client is requesting a virtualhost that does not exist on the Broker. The Broker is replying "Unknown virtual host". Check the connection details that are passed by the client.

2018-08-15 19:55:07 AMQPConnection_0_8Impl - RECV ConnectionOpen[ virtualHost: / capabilities: null insist: false ] 2018-08-15 19:55:07 AMQPConnection_0_8Impl - SEND: Frame channelId: 0, bodyFrame: [ConnectionCloseBodyImpl: replyCode=404, replyText=Unknown virtual host: '/', classId=10, methodId=40]}}

Separately, I notice that you are using quite an old Broker version (6.1.1), I'd suggest choosing a newer release.

Upvotes: 0

Related Questions