Reputation: 5113
Following on from this question, we have a scenario where Rabbit credentials become invalidated, and we need to call resetConnection()
on our CachingConnectionFactory
to pick up a fresh set of credentials.
We're doing this in a ShutdownSignalException
handler, and it basically works. What doesn't work is that we also need to restart our listeners. We have a few of these:
@RabbitListener(
id = ABC,
bindings = @QueueBinding(value = @Queue(value="myQ", durable="true"),
exchange = @Exchange(value="myExchange", durable="true"),
key = "myKey"),
containerFactory = "customQueueContainerFactory"
)
public void process(...) {
...
}
The impression given by this answer (also this) is that we just need to do:
@Autowired RabbitListenerEndpointRegistry registry;
@Autowired CachingConnectionFactory connectionFactory;
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
refreshRabbitMQCredentials();
}
public void refreshRabbitMQCredentials() {
registry.stop(); // do this first
// Fetch credentials, update username/pass
connectionFactory.resetConnection(); // then this
registry.start(); // finally restart
}
The problem is that having debugged my way through SimpleMessageListenerContainer
, when the very first of these containers has doShutdown()
called, Spring tries to cancel the BlockingQueueConsumer
.
Because the underlying Channel
still reports as being open - even though the RabbitMQ UI doesn't report any connections or channels being open - a Cancel event is sent to the broker inside ChannelN.basicCancel()
, but the channel then blocks forever for a reply, and as a result container shutdown is completely blocked.
I've tried injecting a TaskExecutor
(a Executors.newCachedThreadPool()
) into the containers and calling shutdownNow()
or interrupting them, but none of this affects the channel's blocking wait.
It looks like my only option to unblock the channel is to trigger an additional ShutdownSignalException
during cancellation, but (a) I don't know how I can do that, and (b) it looks like I would have to initiate cancellation of all listeners in parallel before trying to shutdown again).
// com.rabbitmq.client.impl.ChannelN
@Override
public void basicCancel(final String consumerTag) throws IOException
{
// [snip]
rpc(new Basic.Cancel(consumerTag, false), k);
try {
k.getReply(); // <== BLOCKS HERE
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
metricsCollector.basicCancel(this, consumerTag);
}
I'm not sure why this is proving so difficult. Is there a simpler way to force SimpleMessageListenerContainer
shutdown?
Using Spring Rabbit 1.7.6; AMQP Client 4.0.3; Spring Boot 1.5.10-RELEASE
UPDATE
Some logs to demonstrate the theory that the message containers are restarting before connection refresh has completed, and that this might be why they don't reconnect:
ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel shutdown: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.ReauthenticatingChannelListener - Channel closed with reply code 403. Assuming credentials have been revoked and refreshing config server properties to get new credentials. Cause: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
WARN u.c.c.c.r.ReauthenticatingChannelListener - Shutdown signalled: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq.gen-4-bqGxbLio9mu8Kc7MMexw' in vhost '/' refused for user 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4', class-id=50, method-id=10)
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshing Rabbit credentials for XXXXXXXX
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Fetching config from server at: http://localhost:8888/configuration
INFO u.c.c.c.r.ReauthenticatingChannelListener - Got ListenerContainerConsumerFailedEvent: Consumer raised exception, attempting restart
INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer@2db55dec: tags=[{amq.ctag-ebAfSnXLbw_W1hlZ5ag7sQ=consumer.myQ}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@12de62aa Shared Rabbit Connection: SimpleConnection@56c95789 [delegate=amqp://[email protected]:5672/, localPort= 50052], acknowledgeMode=AUTO local queue size=0
INFO o.s.c.c.c.ConfigServicePropertySourceLocator - Located environment: name=myApp, profiles=[default], label=null, version=null, state=null
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown initiated...
INFO com.zaxxer.hikari.HikariDataSource - XXXXXXXX - Shutdown completed.
INFO u.c.c.c.r.RabbitMQReauthenticator - Refreshed username: 'cert-configserver-feb6e103-76a8-f5bf-3f23-1e8150812bc4' => 'cert-configserver-d7b54af2-0735-a9ed-7cc4-394803bf5e58'
INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset, proceeding...
UPDATE 2:
This does seem to be a race condition of sorts. Having removed the container stop / starts, if I add a thread-only breakpoint to SimpleMessageListenerContainer.restart()
to let the resetConnection()
race past, and then release the breakpoint, then I can see things start to come back:
16:18:47,208 INFO u.c.c.c.r.RabbitMQReauthenticator - CachingConnectionFactory reset
// Get ready to release the SMLC.restart() breakpoint...
16:19:02,072 INFO o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: rabbitmq.service.consul:5672
16:19:02,083 INFO o.s.a.r.c.CachingConnectionFactory - Created new connection: connectionFactory#7489bca4:1/SimpleConnection@68546c13 [delegate=amqp://[email protected]:5672/, localPort= 33350]
16:19:02,086 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue ...
16:19:02,095 DEBUG u.c.c.c.r.ReauthenticatingChannelListener - Active connection check succeeded for channel AMQChannel(amqp://[email protected]:5672/,1)
16:19:02,120 INFO o.s.amqp.rabbit.core.RabbitAdmin - Auto-declaring a non-durable, auto-delete, or exclusive Queue (springCloudBus...
That being the case I now have to work out either how to delay the container restarts until the refresh is done (i.e. my ShutdownSignalException
handler completes), or make the refresh blocking somehow...
UPDATE 3:
My overall problem, of which this was a symptom, was solved with: https://stackoverflow.com/a/49392990/954442
Upvotes: 2
Views: 1784
Reputation: 174554
It's not at all clear why the channel would report as open; this works fine for me; it recovers after deleting user foo
...
@SpringBootApplication
public class So49323291Application {
public static void main(String[] args) {
SpringApplication.run(So49323291Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry, CachingConnectionFactory cf,
RabbitTemplate template) {
return args -> {
cf.setUsername("foo");
cf.setPassword("bar");
registry.start();
doSends(template);
registry.stop();
cf.resetConnection();
cf.setUsername("baz");
cf.setPassword("qux");
registry.start();
doSends(template);
};
}
public void doSends(RabbitTemplate template) {
while (true) {
try {
template.convertAndSend("foo", "Hello");
Thread.sleep(5_000);
}
catch (Exception e) {
e.printStackTrace();
break;
}
}
}
@RabbitListener(queues = "foo", autoStartup = "false")
public void in(Message in) {
System.out.println(in);
}
}
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=4, consumerTag=amq.ctag-9zt3wUGYSJmoON3zw03wUw, consumerQueue=foo])
2018-03-16 11:24:01.451 ERROR 11867 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - user 'foo' is deleted, class-id=0, method-id=0)
...
Caused by: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
2018-03-16 11:24:01.745 ERROR 11867 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-03-16 11:24:03.740 INFO 11867 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2c4d1ac:3/SimpleConnection@5e9c036b [delegate=amqp://[email protected]:5672/, localPort= 59346]
(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=foo, deliveryTag=1, consumerTag=amq.ctag-ljnY00TBuvy5cCAkpD3r4A, consumerQueue=foo])
However, you really don't need to stop/start the registry, just reconfigure the connection factory with the new credentials and call resetConnection()
; the containers will recover.
Upvotes: 1