WHOATEMYNOODLES
WHOATEMYNOODLES

Reputation: 2643

Sending large base64 files through RabbitMQ to consume on workers

I'm using RabbitMQ and Celery to process email attachments using the gmail API. In my first celery task it fetches batches of emails with large attachments in base64 strings greater than 25mb per file. The current RabbitMQ default limit is 16mb, but I don't want to raise it because I read a few articles about how keeping the message size small is a better practice.

What is the best practice here? While the first task is pulling emails, I want to create multiple other celery workers that processes those files (with OCR and storing it in a database) concurrently to optimize the speed of the process.

A few solutions (that I'm not sure if it's a good practice because I'm a newbie) I came up with:

Is there another solution for my design problem here?

Upvotes: 0

Views: 86

Answers (1)

Serhii Fomenko
Serhii Fomenko

Reputation: 1030

In your case, it might make sense to split your first large task, which downloads the email packages and their attachments, into two smaller tasks. Since your attachments are large, they can be downloaded from a separate endpoint (in the second task).

In the first task you extract and process emails from GmailAPI and pass them to the next task: userId, messageId, attachmentId, access_token.

In the second task, you upload an attachment and immediately perform OCR processing and save the results to the database.

Here is the simplified code for this solution:

import requests  
from celery import shared_task  
  
API_URL = 'https://gmail.googleapis.com'  
  
  
@shared_task  
def process_user_mails(user_id, access_token):  
    def iter_user_mail_ids_from_first_page():  
        url = f'{API_URL}/gmail/v1/users/{user_id}/messages'  
        mails_data = requests.get(url=url, headers=headers).json()  
        for mail in mails_data['messages']:  
            yield mail['id']  
  
    def load_user_mail_message():  
        url = f'{API_URL}/gmail/v1/users/{user_id}/messages/{mail_id}' 
        return requests.get(url=url, headers=headers).json()  
  
    def iter_attachments_meta(payload):  
        if payload.get('filename'):  
            yield {  
                'mime_type': payload['mimeType'],  
                'filename': payload['filename'],  
                'attachment_id': payload['body']['attachmentId'],  
            }  
  
        for part in payload.get('parts', []):  
            yield from iter_attachments_meta(part)  
  
    headers = {'Authorization': f'Bearer {access_token}'}  
    for mail_id in iter_user_mail_ids_from_first_page():  
        mail_message = load_user_mail_message()  
        attachments_it = iter_attachments_meta(mail_message.get('payload', []))  
        for attachment_meta in attachments_it:  
            process_attachment.delay(  
                user_id=user_id,  
                access_token=access_token,  
                mail_id=mail_id,  
                attachment_meta=attachment_meta,  
            )  
  
  
@shared_task  
def process_attachment(user_id, access_token, mail_id, attachment_meta):  
    attachment_id = attachment_meta['attachment_id']  
    headers = {'Authorization': f'Bearer {access_token}'}  
    url = (  
        f'{API_URL}/gmail/v1/users/{user_id}/messages/'  
        f'{mail_id}/attachments/{attachment_id}'  
    )  
    attachment = requests.get(url=url, headers=headers).json()  
    attachment_data = attachment['data']  
    # Do the necessary OCR processing, save everything you need to the database.

The algorithm itself is as simple as possible:

  1. Get the list of messages from GmailApi.
  2. Download the data of each email message.
  3. Extract the meta information of each attachment from each message.
  4. Delegate the attachment processing to the next task.
  5. Download the attachment, do further processing.

All the endpoints that were involved:

  1. users.messages.list
  2. users.messages.get
  3. users.messages.attachments.get

The advantages of this approach are obvious: you pass a minimum of necessary information to the second task, you don't need to save the intermediate state somewhere, the possibility of parallel processing of attachments (Divide and conquer).


The second solution that comes to my mind is to create a shared folder on the server, into which your Celery workers can save and read saved attachments. The flow would look something like this:

  1. In the first task, upload an attachment. Save the data to a file in a shared folder by assigning a unique name to the file. Pass a reference to the file to the second Celery task.
  2. In the second task, read the data from the file, do the necessary OCR processing, save whatever is necessary to the database.
  3. Delete the file from disk (in the same task or delegate to the next task).

What I see as the advantages here:

  1. Attachment data is loaded into RAM only when some processing, such as OCR, needs to be performed.
  2. In the era of SSDs - reading from and writing to a file is very fast. This will be significantly faster than using intermediate cloud storage, given the transportation costs of uploading and downloading.
  3. You are not increasing the RabbitMQ message size limit.
  4. You don't need a lot of disk storage since the processed files will be deleted.

Upvotes: 1

Related Questions