Skip to content
Snippets Groups Projects
tasks.py 5.22 KiB
Newer Older
  • Learn to ignore specific revisions
  • import datetime
    
    import logging
    
    
    from django.conf import settings
    
    from django.db.models import Q, F
    
    from django.utils import timezone
    
    from dynamic_preferences.registries import global_preferences_registry
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from requests.exceptions import RequestException
    
    from funkwhale_api.common import session
    
    from funkwhale_api.music import models as music_models
    
    from funkwhale_api.taskapp import celery
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from . import models, signing
    
    from . import routes
    
    
    logger = logging.getLogger(__name__)
    
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    @celery.app.task(name="federation.clean_music_cache")
    
    def clean_music_cache():
        preferences = global_preferences_registry.manager()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        delay = preferences["federation__music_cache_duration"]
    
        if delay < 1:
            return  # cache clearing disabled
    
        limit = timezone.now() - datetime.timedelta(minutes=delay)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        candidates = (
    
            music_models.TrackFile.objects.filter(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                Q(audio_file__isnull=False)
    
                & (Q(accessed_date__lt=limit) | Q(accessed_date=None))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            .exclude(audio_file="")
            .only("audio_file", "id")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        )
    
        for tf in candidates:
            tf.audio_file.delete()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        storage = models.LibraryTrack._meta.get_field("audio_file").storage
    
        files = get_files(storage, "federation_cache/tracks")
        existing = music_models.TrackFile.objects.filter(audio_file__in=files)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        missing = set(files) - set(existing.values_list("audio_file", flat=True))
    
        for m in missing:
            storage.delete(m)
    
    
    def get_files(storage, *parts):
        """
        This is a recursive function that return all files available
        in a given directory using django's storage.
        """
        if not parts:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            raise ValueError("Missing path")
    
    
        dirs, files = storage.listdir(os.path.join(*parts))
        for dir in dirs:
            files += get_files(storage, *(list(parts) + [dir]))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        return [os.path.join(parts[-1], path) for path in files]
    
    
    
    @celery.app.task(name="federation.dispatch_inbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    def dispatch_inbox(activity):
        """
        Given an activity instance, triggers our internal delivery logic (follow
        creation, etc.)
        """
    
        try:
            routes.inbox.dispatch(
                activity.payload,
                context={
                    "actor": activity.actor,
                    "inbox_items": list(activity.inbox_items.local().select_related()),
                },
            )
        except Exception:
            activity.inbox_items.local().update(
                delivery_attempts=F("delivery_attempts") + 1,
                last_delivery_date=timezone.now(),
            )
            raise
        else:
            activity.inbox_items.local().update(
                delivery_attempts=F("delivery_attempts") + 1,
                last_delivery_date=timezone.now(),
                is_delivered=True,
            )
    
    
    @celery.app.task(name="federation.dispatch_outbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    def dispatch_outbox(activity):
        """
        Deliver a local activity to its recipients
        """
        inbox_items = activity.inbox_items.all().select_related("actor")
        local_recipients_items = [ii for ii in inbox_items if ii.actor.is_local]
        if local_recipients_items:
            dispatch_inbox.delay(activity_id=activity.pk)
        remote_recipients_items = [ii for ii in inbox_items if not ii.actor.is_local]
    
        shared_inbox_urls = {
            ii.actor.shared_inbox_url
            for ii in remote_recipients_items
            if ii.actor.shared_inbox_url
        }
        inbox_urls = {
            ii.actor.inbox_url
            for ii in remote_recipients_items
            if not ii.actor.shared_inbox_url
        }
        for url in shared_inbox_urls:
            deliver_to_remote_inbox.delay(activity_id=activity.pk, shared_inbox_url=url)
    
        for url in inbox_urls:
            deliver_to_remote_inbox.delay(activity_id=activity.pk, inbox_url=url)
    
    
    @celery.app.task(
        name="federation.deliver_to_remote_inbox",
        autoretry_for=[RequestException],
        retry_backoff=30,
        max_retries=5,
    )
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    def deliver_to_remote_inbox(activity, inbox_url=None, shared_inbox_url=None):
        url = inbox_url or shared_inbox_url
        actor = activity.actor
        inbox_items = activity.inbox_items.filter(is_delivered=False)
        if inbox_url:
            inbox_items = inbox_items.filter(actor__inbox_url=inbox_url)
        else:
            inbox_items = inbox_items.filter(actor__shared_inbox_url=shared_inbox_url)
        logger.info("Preparing activity delivery to %s", url)
        auth = signing.get_auth(actor.private_key, actor.private_key_id)
        try:
            response = session.get_session().post(
                auth=auth,
                json=activity.payload,
                url=url,
                timeout=5,
                verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
                headers={"Content-Type": "application/activity+json"},
            )
            logger.debug("Remote answered with %s", response.status_code)
            response.raise_for_status()
        except Exception:
            inbox_items.update(
                last_delivery_date=timezone.now(),
                delivery_attempts=F("delivery_attempts") + 1,
            )
            raise
        else:
            inbox_items.update(last_delivery_date=timezone.now(), is_delivered=True)