Logging configuration for Celery in Flask


#1

Hi,

This is how I define my celery. (please note that app refers to my Flask app)

from my_application import app
import settings
from celery import Celery


def make_celery(_app):
    celery = Celery(_app.import_name, broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND)

    class Config:
        CELERYD_CONCURRENCY = settings.CELERYD_CONCURRENCY
        CELERYD_TASK_SOFT_TIME_LIMIT = settings.CELERYD_TASK_SOFT_TIME_LIMIT
        CELERYD_TASK_TIME_LIMIT = settings.CELERYD_TASK_TIME_LIMIT
        CELERYD_MAX_TASKS_PER_CHILD = settings.CELERYD_MAX_TASKS_PER_CHILD
        CELERY_ACKS_LATE = True
        CELERY_ROUTES = {
            'tasks.all_convert': {'queue': 'protracted'},
            'tasks.re_convert_file_type': {'queue': 'protracted'},
        }
        if app.config['TESTING'] or settings.TESTING:
            CELERY_ALWAYS_EAGER = True
            BROKER_BACKEND = 'memory'
            CELERY_EAGER_PROPAGATES_EXCEPTIONS = True

    celery.config_from_object(Config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with _app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

my_celery = make_celery(app)

I am unable to configure sentry with the above celery instance. Where and how do I add the config settings?


#2

Resolved.

def make_celery(_app):
    celery = Celery(_app.import_name, broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND)

    class Config:
        CELERYD_CONCURRENCY = settings.CELERYD_CONCURRENCY
        CELERYD_TASK_SOFT_TIME_LIMIT = settings.CELERYD_TASK_SOFT_TIME_LIMIT
        CELERYD_TASK_TIME_LIMIT = settings.CELERYD_TASK_TIME_LIMIT
        CELERYD_MAX_TASKS_PER_CHILD = settings.CELERYD_MAX_TASKS_PER_CHILD
        CELERY_ACKS_LATE = True

        CELERY_ROUTES = {
            'tasks.all_convert': {'queue': 'protracted'},
            'tasks.re_convert_file_type': {'queue': 'protracted'},
        }
        if app.config['TESTING'] or settings.TESTING:
            CELERY_ALWAYS_EAGER = True
            BROKER_BACKEND = 'memory'
            CELERY_EAGER_PROPAGATES_EXCEPTIONS = True

    celery.config_from_object(Config)

    sentry = Sentry(_app, dsn=settings.SENTRY_DSN, logging=True, level=logging.INFO)
    dictConfig(settings.LOG_CONFIG)
    sentry.init_app(_app)
    register_logger_signal(sentry.client, loglevel=logging.WARNING)
    register_signal(sentry.client, ignore_expected=True)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with _app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery