Skip to content
Snippets Groups Projects
celery.py 1.63 KiB
Newer Older
import traceback as tb
Eliot Berriot's avatar
Eliot Berriot committed
import os
import logging
import celery.app.task
from django.apps import AppConfig
from django.conf import settings

logger = logging.getLogger("celery")

if not settings.configured:
    # set the default Django settings module for the 'celery' program.
Eliot Berriot's avatar
Eliot Berriot committed
    os.environ.setdefault(
        "DJANGO_SETTINGS_MODULE", "config.settings.production"
Eliot Berriot's avatar
Eliot Berriot committed
    )  # pragma: no cover
app = celery.Celery("funkwhale_api")

@celery.signals.task_failure.connect
def process_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
    print("[celery] Error during task {}: {}".format(task_id, einfo.exception))
    tb.print_exc()
Eliot Berriot's avatar
Eliot Berriot committed
    name = "funkwhale_api.taskapp"
    verbose_name = "Celery Config"

    def ready(self):
        # Using a string here means the worker will not have to
        # pickle the object when using Windows.
Eliot Berriot's avatar
Eliot Berriot committed
        app.config_from_object("django.conf:settings", namespace="CELERY")
        app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)


def require_instance(model_or_qs, parameter_name, id_kwarg_name=None):
    def decorator(function):
        @functools.wraps(function)
        def inner(*args, **kwargs):
Eliot Berriot's avatar
Eliot Berriot committed
            kw = id_kwarg_name or "_".join([parameter_name, "id"])
            pk = kwargs.pop(kw)
            try:
                instance = model_or_qs.get(pk=pk)
            except AttributeError:
                instance = model_or_qs.objects.get(pk=pk)
            kwargs[parameter_name] = instance
            return function(*args, **kwargs)
Eliot Berriot's avatar
Eliot Berriot committed

Eliot Berriot's avatar
Eliot Berriot committed