How to use Flask-SQLAlchemy in a Celery task
Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.
extensions.py
import flaskfrom flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celeryclass FlaskCelery(Celery): def __init__(self, *args, **kwargs): super(FlaskCelery, self).__init__(*args, **kwargs) self.patch_task() if 'app' in kwargs: self.init_app(kwargs['app']) def patch_task(self): TaskBase = self.Task _celery = self class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): if flask.has_app_context(): return TaskBase.__call__(self, *args, **kwargs) else: with _celery.app.app_context(): return TaskBase.__call__(self, *args, **kwargs) self.Task = ContextTask def init_app(self, app): self.app = app self.config_from_object(app.config)celery = FlaskCelery()db = SQLAlchemy()
app.py
from flask import Flaskfrom extensions import celery, dbdef create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.init_app(app) return app
Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).
Troubleshooting
Those who keep getting with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
make sure to:
- Keep the
celery
import at theapp.py
file level. Avoid:
app.py
from flask import Flaskdef create_app(): app = Flask() initiliaze_extensions(app) return appdef initiliaze_extensions(app): from extensions import celery, db # DOOMED! Keep celery import at the FILE level db.init_app(app) celery.init_app(app)
- Start you celery workers BEFORE you
flask run
and use
celery worker -A app:celery -l info -f celery.log
Note the app:celery
, i.e. loading from app.py
.
You can still import from extensions to decorate tasks, i.e. from extensions import celery
.
Old answer below, still works, but not as clean a solution
I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemyfrom celery import Celerydb = SQLAlchemy()celery = Celery()
tasks.py
from extensions import celery, dbfrom flask.globals import current_appfrom celery.signals import task_postrun@celery.taskdef do_some_stuff(): current_app.logger.info("I have the application context") #you can now use the db object from extensions@task_postrun.connectdef close_session(*args, **kwargs): # Flask SQLAlchemy will automatically create new sessions for you from # a scoped session factory, given that we are maintaining the same app # context, this ensures tasks have a fresh session (e.g. session errors # won't propagate across tasks) db.session.remove()
app.py
from extensions import celery, dbdef create_app(): app = Flask() #configure/initialize all your extensions db.init_app(app) celery.config_from_object(app.config) return app
RunCelery.py
from app import create_appfrom extensions import celeryapp = create_app()if __name__ == '__main__': with app.app_context(): celery.start()
I used Paul Gibbs' answer with two differences. Instead of task_postrun I used worker_process_init. And instead of .remove() I used db.session.expire_all().
I'm not 100% sure, but from what I understand the way this works is when Celery creates a worker process, all inherited/shared db sessions will be expired, and SQLAlchemy will create new sessions on demand unique to that worker process.
So far it seems to have fixed my problem. With Paul's solution, when one worker finished and removed the session, another worker using the same session was still running its query, so db.session.remove() closed the connection while it was being used, giving me a "Lost connection to MySQL server during query" exception.
Thanks Paul for steering me in the right direction!
Nevermind that didn't work. I ended up having an argument in my Flask app factory to not run db.init_app(app) if Celery was calling it. Instead the workers will call it after Celery forks them. I now see several connections in my MySQL processlist.
from extensions import dbfrom celery.signals import worker_process_initfrom flask import current_app@worker_process_init.connectdef celery_worker_init_db(**_): db.init_app(current_app)
In your tasks.py file do the following:
from main import create_appapp = create_app()celery = Celery(__name__)celery.add_defaults(lambda: app.config)@celery.taskdef create_facet(project_id, **kwargs): with app.test_request_context(): # your code