Reputation: 149
I have coded up a C (rabbitmq-c) worker app which consumes a queue published by a Python script (pika).
I have the following strange behaviour which I can't seem to solve:
Any ideas what could be going on?
I've tried making sure that each consumer has it's own channel (is this necessary?) but still the same behaviour...
Here's the code for the consumer (worker):
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
"/",
0,
131072,
0,
AMQP_SASL_METHOD_PLAIN,
"guest",
"guest");
if (amqp_channel_open(conn, chan) == NULL)
LOG_ERR(" [!] Failed to open amqp channel!\n");
if ((q = amqp_queue_declare(conn,
chan,
amqp_cstring_bytes("ranges"),
0,
0,
0,
0,
amqp_empty_table)) == NULL)
LOG_ERR(" [!] Failed to declare queue!\n");
LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);
amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while(1) {
amqp_maybe_release_buffers(conn);
amqp_consume_message(conn, &e, NULL, 0);
{
int n;
amqp_frame_t f;
unsigned char buf[8];
unsigned char *pbuf = buf;
amqp_simple_wait_frame(conn, &f); // METHOD frame
amqp_simple_wait_frame(conn, &f); // HEADER frame
n = f.payload.properties.body_size;
if (n != sizeof(range_buf))
LOG_ERR(" [!] Invalid message size!");
while (n) {
amqp_simple_wait_frame(conn, &f); // BODY frame
memcpy(pbuf,
f.payload.body_fragment.bytes,
f.payload.body_fragment.len);
n -= f.payload.body_fragment.len;
pbuf += f.payload.body_fragment.len;
}
// do something with buf
LOG_INFO(" [x] Message recevied from queue\n");
}
amqp_destroy_envelope(&e);
amqp_maybe_release_buffers(conn);
}
Upvotes: 4
Views: 1799
Reputation: 26352
The problem here is most likely that your consumer pre-fetches all messages when started. This is default behavior by RabbitMQ, but you can reduce the number of messages pre-fetched by the consumer, to allow you to better spread the workload across multiple workers.
This simply means that one or more of the consumers will pick up all the messages, and leave none for the new consumers.
If you apply qos to your consumer and limit the pre-fetch to lets say 10 messages. The consumer will only queue up the 10 first messages, and the new consumers can pick up the slack.
The function you are looking for to implement this is called amqp_basic_qos, and in addition you can read more about consumer-prefetch here.
Upvotes: 4
Reputation: 116
This might help you
Message acknowledgment
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren't any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It's fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default.
Upvotes: 0