Python Flask with celery out of application context
Here is a solution which works with the flask application factory pattern and also creates celery task with context, without needing to use app.app_context()
. It is really tricky to get that app while avoiding circular imports, but this solves it. This is for celery 4.2 which is the latest at the time of writing.
Structure:
repo_name/ manage.py base/ base/__init__.py base/app.py base/runcelery.py base/celeryconfig.py base/utility/celery_util.py base/tasks/workers.py
So base
is the main application package in this example. In the base/__init__.py
we create the celery instance as below:
from celery import Celerycelery = Celery('base', config_source='base.celeryconfig')
The base/app.py
file contains the flask app factory create_app
and note the init_celery(app, celery)
it contains:
from base import celeryfrom base.utility.celery_util import init_celerydef create_app(config_obj): """An application factory, as explained here: http://flask.pocoo.org/docs/patterns/appfactories/. :param config_object: The configuration object to use. """ app = Flask('base') app.config.from_object(config_obj) init_celery(app, celery=celery) register_extensions(app) register_blueprints(app) register_errorhandlers(app) register_app_context_processors(app) return app
Moving on to base/runcelery.py
contents:
from flask.helpers import get_debug_flagfrom base.settings import DevConfig, ProdConfigfrom base import celeryfrom base.app import create_appfrom base.utility.celery_util import init_celeryCONFIG = DevConfig if get_debug_flag() else ProdConfigapp = create_app(CONFIG)init_celery(app, celery)
Next, the base/celeryconfig.py
file (as an example):
# -*- coding: utf-8 -*-"""Configure Celery. See the configuration guide at ->http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration"""## Broker settings.broker_url = 'pyamqp://guest:guest@localhost:5672//'broker_heartbeat=0# List of modules to import when the Celery worker starts.imports = ('base.tasks.workers',)## Using the database to store task state and results.result_backend = 'rpc'#result_persistent = Falseaccept_content = ['json', 'application/text']result_serializer = 'json'timezone = "UTC"# define periodic tasks / cron here# beat_schedule = {# 'add-every-10-seconds': {# 'task': 'workers.add_together',# 'schedule': 10.0,# 'args': (16, 16)# },# }
Now define the init_celery in the base/utility/celery_util.py
file:
# -*- coding: utf-8 -*-def init_celery(app, celery): """Add flask app context to celery.Task""" TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask
For the workers in base/tasks/workers.py
:
from base import celery as celery_appfrom flask_security.utils import config_value, send_mailfrom base.bp.users.models.user_models import Userfrom base.extensions import mail # this is the flask-mail@celery_app.taskdef send_async_email(msg): """Background task to send an email with Flask-mail.""" #with app.app_context(): mail.send(msg)@celery_app.taskdef send_welcome_email(email, user_id, confirmation_link): """Background task to send a welcome email with flask-security's mail. You don't need to use with app.app_context() here. Task has context. """ user = User.query.filter_by(id=user_id).first() print(f'sending user {user} a welcome email') send_mail(config_value('EMAIL_SUBJECT_REGISTER'), email, 'welcome', user=user, confirmation_link=confirmation_link)
Then, you need to start the celery beat and celery worker in two different cmd prompts from inside the repo_name
folder.
In one cmd prompt do a celery -A base.runcelery:celery beat
and the other celery -A base.runcelery:celery worker
.
Then, run through your task that needed the flask context. Should work.
I don't have any points, so I couldn't upvote @codegeek's above answer, so I decided to write my own since my search for an issue like this was helped by this question/answer: I've just had some success trying to tackle a similar problem in a python/flask/celery scenario. Even though your error was from trying to use mail
while my error was around trying to use url_for
in a celery task, I suspect the two were related to the same problem and that you would have had errors stemming from the use of url_for
if you had tried to use that before mail
.
With no context of the app present in a celery task (even after including an import app from my_app_module
) I was getting errors, too. You'll need to perform the mail
operation in the context of the app:
from module_containing_my_app_and_mail import app, mail # Flask app, Flask mailfrom flask.ext.mail import Message # Message class@celery.taskdef send_forgot_email(email, ref): with app.app_context(): # This is the important bit! msg = Message("Recover your Crusade Gaming Account") msg.recipients = [email] msg.sender = "Crusade Gaming stuff@cg.com" msg.html = \ """ Hello Person,<br/> You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br /> If you did not request that your password be reset, please ignore this. """.format(url_for('account.forgot', ref=ref, _external=True)) mail.send(msg)
If anyone is interested, my solution for the problem of using url_for
in celery tasks can be found here