How to check if Celery/Supervisor is running using Python How to check if Celery/Supervisor is running using Python flask flask

How to check if Celery/Supervisor is running using Python


Update 09/2020: Jérôme updated this answer for Celery 4.3 here: https://stackoverflow.com/a/57628025/1159735

You can run the celery status command via code by importing the celery.bin.celery package:

import celeryimport celery.bin.baseimport celery.bin.celeryimport celery.platformsapp = celery.Celery('tasks', broker='redis://')status = celery.bin.celery.CeleryCommand.commands['status']()status.app = status.get_app()def celery_is_up():    try:        status.run()        return True    except celery.bin.base.Error as e:        if e.status == celery.platforms.EX_UNAVAILABLE:            return False        raise eif __name__ == '__main__':    if celery_is_up():        print('Celery up!')    else:        print('Celery not responding...')


How about using subprocess, not sure if it is a good idea:

>>> import subprocess>>> output = subprocess.check_output('ps aux'.split())>>> 'supervisord' in outputTrue


you can parse process state from supervisorctl status output

import subprocessdef is_celery_worker_running():    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()    if ctl_output == 'unix:///var/run/supervisor.sock no such file':        # supervisord not running        return False    elif ctl_output == 'No such process celery_worker':        return False    else:        state = ctl_output.split()[1]        return state == 'RUNNING'