Reputation: 11
I am new to celery and there isn't alot of great documentation to follow esepcially with mongodb. I am essentially trying to have celery post messages to two different collections based on where celery is running (local or production). But when i call a task it keeps posting on the same collection "kombu_default".
I have added my celery code below. Please let me know what I am doing wrong?
from celery import Celery
import sys
import os
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__)))
)
from core.config import settings, logger
from celery.schedules import crontab
# mongo hose mongodb_host="mongodb+srv://{username}:{password}@cluster0.sq3xm.mongodb.net/{database}?retryWrites=true&w=majority&appName=Cluster0"
BROKER_URL = settings.mongodb_host.format(
username=settings.mongodb_username
, password=settings.mongodb_password
, database=settings.cel_broker_db_name
)
BACKEND_URL = settings.mongodb_host.format(
username=settings.mongodb_username
, password=settings.mongodb_password
, database=settings.cel_backend_db_name
)
logger.info(f"Broker URL: {BROKER_URL}")
celery = Celery(
"celery_app",
# borker that will store all the tasks
broker=BROKER_URL, # Message broker (mongo)
# this is where the results will be stored
include=["tasks" , "chain"]
)
# Define the queue and routing configuration
celery.conf.task_queues = {
'cel_test': {
'exchange': 'celery',
'exchange_type': 'direct',
'binding_key': 'cel_test'
}
}
celery.conf.task_routes = {
'tasks.*': {'queue': 'cel_test'},
'chain.*': {'queue': 'cel_test'}
}
#Load Backend Settings
celery.conf.update(
backend = BACKEND_URL,
backend_settings = {
"database": settings.cel_backend_db_name,
"taskmeta_collection": settings.cel_backend_collection_name
},
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
broker_connection_retry_on_startup=True,
timezone="Europe/London"
)
# Schedule the task to run every 1yr
celery.conf.beat_schedule = {
"write-tweet-for-ai_nympho": {
"task": "write_tweet_for_user",
"schedule": 1.0*60.0*60.0,
"args": ("1866555368765100032",) # Replace "user_id_value" with the actual user_id
},
"write-tweet-for-hsingh": {
"task": "write_tweet_for_user",
"schedule": 1.0*60.0*60.0,
"args": ("1761349378227286016",) # Replace "user_id_value" with the actual user_id
},
"execute_save_tweets_chain": {
"task": "execute_save_tweets_chain",
"schedule": 2.0*60.0*60.0,
"args": ("1866555368765100032",) # Replace "user_id_value" with the actual user_id
},
"execute_save_tweets_chain": {
"task": "execute_save_tweets_chain",
"schedule": 2.0*60.0*60.0,
"args": ("1761349378227286016",) # Replace "user_id_value" with the actual user_id
}
}
# Ensure the Celery app is discoverable
celery.autodiscover_tasks(['celery_app'
,"celery"
,"tasks"
,"chain"])
I tried changing the database name, it created a new database but it doesn't post anything there.
Upvotes: 1
Views: 21