Mr. Zhang
Mr. Zhang

Reputation: 103

Using RabbitMQ as Flink DataStream Source without create RabbitMQ queue automatically

When I use RabbitMQ as Flink DataStream Source,just as the Flink Documentation said.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();

final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
    connectionConfig,            // config for the RabbitMQ connection
    "queueName",                 // name of the RabbitMQ queue to consume
    true,                        // use correlation ids; can be false if only at-least-once is required
    new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
.setParallelism(1);              // non-parallel source is only required for exactly-once

This code will connect to RabbitMQ and auto create Queue "queueName".So I have got a problem. The RabbitMQ Queue already exist,I created it before. I don't want Flink try to create again. And the problem is Flink create the Queue without some paramters, that is conflict with the Queue I created before. Here is the Exception:

    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'queueName' in vhost '/': received none but current is the value '604800000' of type 'long', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
... 10 more

How to make Flink just subscribe a RabbitMQ queue without try to create a new one? Thank you all.

Upvotes: 3

Views: 1242

Answers (1)

Vitaly Tsvetkoff
Vitaly Tsvetkoff

Reputation: 271

You can write your own class extending RMQSource and override setupQueue method in order to not create queue

Upvotes: 4

Related Questions