Cell
Cell

Reputation: 11

apache-beam: RabbitMQ channel connection suddenly closes: "ChannelAlreadyClosedException: channel is already closed due to clean channel shutdown"

I'm learning Beam, and I'm trying to make a pipeline that reads from a RabbitMQ topic (running in a docker container) as an unbounded source, meaning that I have 2 applications running, one that publishes messages and the pipeline that reads them. But for some reason, when the pipeline is running for some time (not idle, it is reading) the channel closes and the following error appears: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

This pipeline reads from the topic, and transforms the message to string to use it later:

PCollection<String> messages = pipeline
    .apply(RabbitMQ.read()
        .withUri(MY_URI)
        .withQueue("Queue-1")
        .withExchange("Exchange-1", "test.*")
    )
    .apply(Window.<RabbitMqMessage>into(FixedWindows.of(Duration.standardSeconds(10)))
        .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .withAllowedLateness(Duration.StandardSeconds(5))
        .discardingFiredPanes()
    )
    .apply("msg to string", ParDo.of(new DoFn<RabbitMqMessage, String>() {
        @ProcessElement public void process(@Element RabbitMqMessage msg, OutputReceiver<String> out) {
            out.output(new String(msg.getBody()));
        }
    }));

On the other side, I have this class that publishes messages to the topic:

Connection conn = ... //gets connection
Channel chan = conn.createChannel();

chan.exchangeDeclare("Exchange-1", BuiltinExchangeType.TOPIC, true);
chan.queueDeclare("Queue-1", true, false, false, null);
chan.queueBind("Queue-1", "Exchange-1", "test.*");

for (int i = 0; true; i++) {
    final String msg = "test message "+i;
    chan.basicPublish("Exchange-1", test.msg, null, message.getBytes());    
}

So I execute both, the publisher starts sending messages, and the pipeline successfully reads them from the topic, but after some time (usually 10~15 seconds) the mentioned error occurs, I would like to know if there is a way to now allow the channel to close.

RabbitMq java dependency is version 5.11.0, Beam is 2.50.0, and the rabbitmq image in docker is 3-management-alpine

I expected the pipeline to be able to read indefinitely from the topic.

Additionally I tried removing the window apply, removing the transforms that I apply to messages later (not showed here), publishing some messages and after that reading them all as a bounded source (it successfully does reads old messages), adding a 500ms delay to each message published, and other things, but it always closes the channel after reading for a while, idle or not.

Upvotes: 1

Views: 72

Answers (0)

Related Questions