Shkolar
Shkolar

Reputation: 422

Best Practice for Batch Processing with RabbitMQ

I'm looking for the best way to preform ETL using Python.

I'm having a channel in RabbitMQ which send events (can be even every second). I want to process every 1000 of them. The main problem is that RabbitMQ interface (I'm using pika) raise callback upon every message. I looked at Celery framework, however the batch feature was depreciated in version 3.

What is the best way to do it? I thinking about saving my events in a list, and when it reaches 1000 to copy it to other list and preform my processing. However, how do I make it thread-safe? I don't want to lose events, and I'm afraid of losing events while synchronising the list.

It sounds like a very simple use-case, however I didn't find any good best practice for it.

Upvotes: 5

Views: 13379

Answers (3)

Hamid
Hamid

Reputation: 231

Here is sample code for batch processing with pika.

class RabbitMQBatchConsumer():
def __init__(self, USERNAME, PASSWORD, HOST, BATCH_SIZE, QUEUE):
    credentials = pika.PlainCredentials(USERNAME, PASSWORD)
    parameters = pika.ConnectionParameters(HOST, 5672, '/', credentials)
    self.connection = pika.BlockingConnection(parameters)
    self.channel = self.connection.channel()
    self.messages= []
    self.batch_size= BATCH_SIZE
    self.queue = QUEUE
    
def start_consuming(self):
    self.channel.basic_consume(queue=self.queue, on_message_callback=self.on_message)
    self.channel.start_consuming()
    
def on_message(self, unused_channel, basic_deliver, properties, body):
    self.messages.append((basic_deliver, properties, body))
    if len(self.messages) == self.batch_size:
        self.process()
        
def process(self):
    # IMPLEMENT YOUR PROCESS HERE WITH self.messages
    try:
        for i in range(len(self.messages)):
            msg = self.messages.pop()
            self.channel.basic_ack(msg[0].delivery_tag)
    except Exception as e:
        print(e)

After that you just need to create instance of this class and call start_consuming function.

batch_consumer = RabbitMQBatchConsumer(USERNAME, PASSWORD, HOST, BATCH_SIZE, QUEUE)
batch_consumer.start_consuming()

Upvotes: 0

Sherif Behna
Sherif Behna

Reputation: 613

First of all, you should not "batch" messages from RabbitMQ unless you really have to. The most efficient way to work with messaging is to process each message independently.

If you need to combine messages in a batch, I would use a separate data store to temporarily store the messages, and then process them when they reach a certain condition. Each time you add an item to the batch, you check that condition (for example, you reached 1000 messages) and trigger the processing of the batch.

This is better than keeping a list in memory, because if your service dies, the messages will still be persisted in the database.

Note : If you have a single processor per queue, this can work without any synchronization mechanism. If you have multiple processors, you will need to implement some sort of locking mechanism.

Upvotes: -1

menya
menya

Reputation: 1525

How do I make it thread-safe?

How about set consumer prefetch-count=1000. If a consumer's unack messages reach its prefetch limit, rabbitmq will not deliver any message to it.

Don't ACK received message, until you have 1000 messages, then copy it to other list and preform your processing. When your job done, ACK the last message, and all message before this message will be ACK by rabbitmq server.

But I am not sure whether large prefetch is the best practice.

Upvotes: 5

Related Questions