hamidfzm
hamidfzm

Reputation: 4685

How to use celery for inserting data to mongodb using mongoengine

I'm trying to use celery to insert large data in my mongodb but the problem is concurrency in mongodb. If I send more than one task at one time to celery part of data will be inserted to mongodb and some other data will not. I think it's because mongodb locks database on insert operation but I need a solution to be able to send multiple tasks of same type to insert data in database. Like check whether database is locked if it is wait for it to unlock. Here is part of my code:

@celery.task(name='celery_tasks.add_book_product')
def add_book_product(product_dict, store_id):

    connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST)

    store_obj = Store.objects.get(pk=store_id)

    try:
        book = Books.objects.get(pk=product_dict['RawBook'])

        try:
            product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book')
            print("Product {} found for store {}".format(product_obj.id, store_obj.id))
            product_obj.count = int(product_dict['count'])
            product_obj.buy_price = int(product_dict['buy_book'])
            product_obj.sell_price = int(product_dict['sell_book'])

            product_obj.save()

        except (DoesNotExist, ValidationError):
            product_obj = Product(store=store_obj,
                                  related_book=book,
                                  kind='book',
                                  count=int(product_dict['count']),
                                  buy_price=int(product_dict['buy_book']),
                                  sell_price=int(product_dict['sell_book']),
                                  name=book.name_fa)

            product_obj.save()

            print("Appending books to store obj...")
            store_obj.products.append(product_obj)
            store_obj.save()
            print("Appending books to store obj done")

        return "Product {} saved for store {}".format(product_obj.id, store_obj.id)
    except (DoesNotExist, ValidationError):
        traceback.print_exc()
        return "Product with raw book {} does not exist.".format(product_dict['RawBook'])

Upvotes: 1

Views: 1999

Answers (1)

Chillar Anand
Chillar Anand

Reputation: 29534

By default multiprocessing is used to perform concurrent execution of tasks in celery. But there are two ways to make sure only one task is executed at any given time.

Solution 1:

When you start a celery worker with

celery -A your_app worker -l info

the default concurrency is equal to the number of the cores your machine has. So if you start a worker like this

celery -A your_app worker -l info -c 1

it runs only one task at any given time. If you have some other tasks which has to be executed, you can start a new queue and assign a worker to do it.

Solution 2:

This is little complicated. You need to use a lock in your tasks, something like this.

if acquire_lock():
    try:
        #do something
    finally:
        release_lock()
    return 

You can read more about this in Celery documentation.

Upvotes: 2

Related Questions