Flask with create_app, SQLAlchemy and Celery Flask with create_app, SQLAlchemy and Celery flask flask

Flask with create_app, SQLAlchemy and Celery


I was off with the current_app advice.

Your celery object needs access to the application context. I found some information online about creating the Celery object with a factory function. Example below is tested without a message broker.

#factory.pyfrom celery import Celeryfrom config import configdef create_celery_app(app=None):    app = app or create_app(config)    celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])    celery.conf.update(app.config)    TaskBase = celery.Task    class ContextTask(TaskBase):        abstract = True        def __call__(self, *args, **kwargs):            with app.app_context():                return TaskBase.__call__(self, *args, **kwargs)    celery.Task = ContextTask    return celery

and in tasks.py:

#tasks.pyfrom factory import create_celery_appfrom celery.signals import task_prerunfrom flask import gcelery = create_celery_app()@task_prerun.connectdef celery_prerun(*args, **kwargs):    #print g    with celery.app.app_context():    #   # use g.db       print g@celery.task()def do_some_stuff():    with celery.app.app_context():        # use g.db        g.user = "test"        print g.user

Some links:

Flask pattern for creating a Celery instance with factory function

Application using both application factory and celery

Source for said application's factory.py

Source for application tasks.py


Here is a solution which works with the flask application factory pattern and also creates celery task with context, without needing to use app.app_context() explicitly in the tasks. In my app, it is really tricky to get that app object while avoiding circular imports, but this solves it. This is also good for the latest celery version 4.2 at time of writing.

Structure:

repo_name/    manage.py    base/    base/__init__.py    base/app.py    base/runcelery.py    base/celeryconfig.py    base/utility/celery_util.py    base/tasks/workers.py

So base is the main application package in this example. In the base/__init__.py we create the celery instance as below:

from celery import Celerycelery = Celery('base', config_source='base.celeryconfig')

The base/app.py file contains the flask app factory create_app and note the init_celery(app, celery) it contains:

from base import celeryfrom base.utility.celery_util import init_celerydef create_app(config_obj):    """An application factory, as explained here:    http://flask.pocoo.org/docs/patterns/appfactories/.    :param config_object: The configuration object to use.    """    app = Flask('base')    app.config.from_object(config_obj)    init_celery(app, celery=celery)    register_extensions(app)    register_blueprints(app)    register_errorhandlers(app)    register_app_context_processors(app)    return app

Moving on to base/runcelery.py contents:

from flask.helpers import get_debug_flagfrom base.settings import DevConfig, ProdConfigfrom base import celeryfrom base.app import create_appfrom base.utility.celery_util import init_celeryCONFIG = DevConfig if get_debug_flag() else ProdConfigapp = create_app(CONFIG)init_celery(app, celery)

Next, the base/celeryconfig.py file (as an example):

# -*- coding: utf-8 -*-"""Configure Celery. See the configuration guide at ->http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration"""## Broker settings.broker_url = 'pyamqp://guest:guest@localhost:5672//'broker_heartbeat=0# List of modules to import when the Celery worker starts.imports = ('base.tasks.workers',)## Using the database to store task state and results.result_backend = 'rpc'#result_persistent = Falseaccept_content = ['json', 'application/text']result_serializer = 'json'timezone = "UTC"# define periodic tasks / cron here# beat_schedule = {#    'add-every-10-seconds': {#        'task': 'workers.add_together',#        'schedule': 10.0,#        'args': (16, 16)#    },# }

Now define the init_celery in the base/utility/celery_util.py file:

# -*- coding: utf-8 -*-def init_celery(app, celery):    """Add flask app context to celery.Task"""    TaskBase = celery.Task    class ContextTask(TaskBase):        abstract = True        def __call__(self, *args, **kwargs):            with app.app_context():                return TaskBase.__call__(self, *args, **kwargs)    celery.Task = ContextTask

For the workers in base/tasks/workers.py:

from base import celery as celery_appfrom flask_security.utils import config_value, send_mailfrom base.bp.users.models.user_models import User@celery_app.taskdef send_welcome_email(email, user_id, confirmation_link):    """Background task to send a welcome email with flask-security's mail.    You don't need to use with app.app_context() as Task has app context.    """    user = User.query.filter_by(id=user_id).first()    print(f'sending user {user} a welcome email')    send_mail(config_value('EMAIL_SUBJECT_REGISTER'),              email,              'welcome', user=user,              confirmation_link=confirmation_link) @celery_app.taskdef do_some_stuff():    print(g)

Then, you need to start the celery beat and celery worker in two different cmd prompts from inside the repo_name folder.

In one cmd prompt do a celery -A base.runcelery:celery beat and the other celery -A base.runcelery:celery worker.

Then, run through your task that needed the flask context. Should work.