Himanshu Singh
Himanshu Singh

Reputation: 11

MongoDB and celery configuration

I am new to celery and there isn't alot of great documentation to follow esepcially with mongodb. I am essentially trying to have celery post messages to two different collections based on where celery is running (local or production). But when i call a task it keeps posting on the same collection "kombu_default".

I have added my celery code below. Please let me know what I am doing wrong?

from celery import Celery
import sys
import os 
sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__)))
)
from core.config import settings, logger
from celery.schedules import crontab

# mongo hose mongodb_host="mongodb+srv://{username}:{password}@cluster0.sq3xm.mongodb.net/{database}?retryWrites=true&w=majority&appName=Cluster0"

BROKER_URL = settings.mongodb_host.format(
    username=settings.mongodb_username
    , password=settings.mongodb_password
    , database=settings.cel_broker_db_name    
)

BACKEND_URL = settings.mongodb_host.format(
    username=settings.mongodb_username
    , password=settings.mongodb_password
    , database=settings.cel_backend_db_name    
)
logger.info(f"Broker URL: {BROKER_URL}")
celery = Celery(
    "celery_app",
    # borker that will store all the tasks 
    broker=BROKER_URL,  # Message broker (mongo)
    # this is where the results will be stored 
    include=["tasks" , "chain"]
)


# Define the queue and routing configuration
celery.conf.task_queues = {
    'cel_test': {
        'exchange': 'celery',
        'exchange_type': 'direct',
        'binding_key': 'cel_test'
    }
}

celery.conf.task_routes = {
    'tasks.*': {'queue': 'cel_test'},
    'chain.*': {'queue': 'cel_test'}
}


#Load Backend Settings
celery.conf.update(
    backend = BACKEND_URL,
    backend_settings = {
        "database": settings.cel_backend_db_name,
        "taskmeta_collection": settings.cel_backend_collection_name
    },
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    broker_connection_retry_on_startup=True,
    timezone="Europe/London"
)



# Schedule the task to run every 1yr
celery.conf.beat_schedule = {
    "write-tweet-for-ai_nympho": {
        "task": "write_tweet_for_user",
        "schedule":  1.0*60.0*60.0,
        "args": ("1866555368765100032",)  # Replace "user_id_value" with the actual user_id
    }, 
    "write-tweet-for-hsingh": {
        "task": "write_tweet_for_user",
        "schedule": 1.0*60.0*60.0,
        "args": ("1761349378227286016",)  # Replace "user_id_value" with the actual user_id
    },
    "execute_save_tweets_chain": {
        "task": "execute_save_tweets_chain",
        "schedule": 2.0*60.0*60.0,
        "args": ("1866555368765100032",)  # Replace "user_id_value" with the actual user_id
    },
        "execute_save_tweets_chain": {
        "task": "execute_save_tweets_chain",
        "schedule": 2.0*60.0*60.0,
        "args": ("1761349378227286016",)  # Replace "user_id_value" with the actual user_id
    }
}


# Ensure the Celery app is discoverable
celery.autodiscover_tasks(['celery_app'
                           ,"celery"
                           ,"tasks"
                           ,"chain"])

I tried changing the database name, it created a new database but it doesn't post anything there.

Upvotes: 1

Views: 21

Answers (0)

Related Questions