Reputation: 376
I use Celery to run web spiders which crawl some data, and after that I need to save this data somewhere in database (SQLite for example), but as I understand I can't share SQLAlchemy session between Celery workers. How do you solve this problem? Which way is common?
Currently I am trying to use Redis as a middle storage for data.
@celery.task
def run_spider(spider, task):
# setup worker
logger = logging.getLogger('Spider: %s' % spider.url)
spider.meta.update({'logger': logger, 'task_id': int(task.id)})
# push task data inside worker
spider.meta.update({'task_request': run_spider.request})
spider.run()
task.state = "inactive"
task.resolved = datetime.datetime.now()
db.session.add(task)
db.session.commit()
EDIT: Actually i was wrong, i don't need to share sessions, i need to create new database connection for each celery process/task.
Upvotes: 2
Views: 2186
Reputation: 376
Actually i was wrong, i don't need to share sessions, i need to create new database connection for each celery process/task.
Upvotes: 0
Reputation: 132128
I too have used redis for persistence in a large celery application.
It is common for my tasks to look like this:
@task
def MyTask(sink, *args, **kwargs):
data_store = sharded_redis.ShardedRedis(sink)
key_helper = helpers.KeyHelper()
my_dictionary = do_work()
data_store.hmset(key_helper.key_for_my_hash(), my_dictionary)
sharded_redis
is just an abstraction of several redis shards handling sharding keys via the client.sink
is a list of (host, port)
tuples that are used to make the appropriate connection after the shard is determined.Essentially you are connecting and disconnecting from redis with each task (really cheap) rather than creating a connection pool.
Using a connection pool would work, but it you are going to really utilize celery (run a lot of concurrent tasks) then you would be better off (in my opinion) using this method since you run the risk of exhausting your connection pool, especially if you are doing anything that takes a bit longer in redis (like reading a large dataset into memory).
Connections to redis are pretty cheap, so this should scale well. We were handling several hundred thousand tasks per minute on a couple instances.
Upvotes: 4