Skip to content
Snippets Groups Projects
tasks.py 8.69 KiB
Newer Older
  • Learn to ignore specific revisions
  • import datetime
    
    import logging
    
    import requests
    
    
    from django.conf import settings
    
    from django.db.models import Q, F
    
    from django.utils import timezone
    
    from dynamic_preferences.registries import global_preferences_registry
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from requests.exceptions import RequestException
    
    from funkwhale_api.common import preferences
    
    from funkwhale_api.common import session
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from funkwhale_api.common import utils as common_utils
    
    from funkwhale_api.music import models as music_models
    
    from funkwhale_api.taskapp import celery
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from . import models, signing
    
    from . import serializers
    
    from . import routes
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from . import utils
    
    
    logger = logging.getLogger(__name__)
    
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    @celery.app.task(name="federation.clean_music_cache")
    
    def clean_music_cache():
        preferences = global_preferences_registry.manager()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        delay = preferences["federation__music_cache_duration"]
    
        if delay < 1:
            return  # cache clearing disabled
    
        limit = timezone.now() - datetime.timedelta(minutes=delay)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        candidates = (
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            music_models.Upload.objects.filter(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                Q(audio_file__isnull=False)
    
                & (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
                # library__actor__user=None,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            .exclude(audio_file="")
            .only("audio_file", "id")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        for upload in candidates:
            upload.audio_file.delete()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        storage = models.LibraryTrack._meta.get_field("audio_file").storage
    
        files = get_files(storage, "federation_cache/tracks")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        existing = music_models.Upload.objects.filter(audio_file__in=files)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        missing = set(files) - set(existing.values_list("audio_file", flat=True))
    
        for m in missing:
            storage.delete(m)
    
    
    def get_files(storage, *parts):
        """
        This is a recursive function that return all files available
        in a given directory using django's storage.
        """
        if not parts:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            raise ValueError("Missing path")
    
        try:
            dirs, files = storage.listdir(os.path.join(*parts))
        except FileNotFoundError:
            return []
    
        for dir in dirs:
            files += get_files(storage, *(list(parts) + [dir]))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        return [os.path.join(parts[-1], path) for path in files]
    
    
    
    @celery.app.task(name="federation.dispatch_inbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    
    def dispatch_inbox(activity, call_handlers=True):
    
        """
        Given an activity instance, triggers our internal delivery logic (follow
        creation, etc.)
        """
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        routes.inbox.dispatch(
            activity.payload,
            context={
                "activity": activity,
                "actor": activity.actor,
                "inbox_items": activity.inbox_items.filter(is_read=False),
            },
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        )
    
    
    
    @celery.app.task(name="federation.dispatch_outbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    def dispatch_outbox(activity):
        """
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        Deliver a local activity to its recipients, both locally and remotely
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        inbox_items = activity.inbox_items.filter(is_read=False).select_related()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        if inbox_items.exists():
    
            dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
    
        if not preferences.get("federation__enabled"):
            # federation is disabled, we only deliver to local recipients
            return
    
        deliveries = activity.deliveries.filter(is_delivered=False)
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        for id in deliveries.values_list("pk", flat=True):
            deliver_to_remote.delay(delivery_id=id)
    
    
    
    @celery.app.task(
        name="federation.deliver_to_remote_inbox",
        autoretry_for=[RequestException],
        retry_backoff=30,
        max_retries=5,
    )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    @celery.require_instance(
        models.Delivery.objects.filter(is_delivered=False).select_related(
            "activity__actor"
        ),
        "delivery",
    )
    def deliver_to_remote(delivery):
    
    
        if not preferences.get("federation__enabled"):
            # federation is disabled, we only deliver to local recipients
            return
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        actor = delivery.activity.actor
        logger.info("Preparing activity delivery to %s", delivery.inbox_url)
    
        auth = signing.get_auth(actor.private_key, actor.private_key_id)
        try:
            response = session.get_session().post(
                auth=auth,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                json=delivery.activity.payload,
                url=delivery.inbox_url,
    
                timeout=5,
                verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
                headers={"Content-Type": "application/activity+json"},
            )
            logger.debug("Remote answered with %s", response.status_code)
            response.raise_for_status()
        except Exception:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            delivery.last_attempt_date = timezone.now()
            delivery.attempts = F("attempts") + 1
            delivery.save(update_fields=["last_attempt_date", "attempts"])
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            delivery.last_attempt_date = timezone.now()
            delivery.attempts = F("attempts") + 1
            delivery.is_delivered = True
            delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
    
    
    
    def fetch_nodeinfo(domain_name):
        s = session.get_session()
        wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
        response = s.get(
            url=wellknown_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
        )
        response.raise_for_status()
        serializer = serializers.NodeInfoSerializer(data=response.json())
        serializer.is_valid(raise_exception=True)
        nodeinfo_url = None
        for link in serializer.validated_data["links"]:
            if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
                nodeinfo_url = link["href"]
                break
    
        response = s.get(
            url=nodeinfo_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
        )
        response.raise_for_status()
        return response.json()
    
    
    @celery.app.task(name="federation.update_domain_nodeinfo")
    @celery.require_instance(
        models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
    )
    def update_domain_nodeinfo(domain):
        now = timezone.now()
        try:
            nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
        except (requests.RequestException, serializers.serializers.ValidationError) as e:
            nodeinfo = {"status": "error", "error": str(e)}
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
        service_actor_id = common_utils.recursive_getattr(
            nodeinfo, "payload.metadata.actorId", permissive=True
        )
        try:
            domain.service_actor = (
                utils.retrieve_ap_object(
                    service_actor_id,
                    queryset=models.Actor,
                    serializer_class=serializers.ActorSerializer,
                )
                if service_actor_id
                else None
            )
        except (serializers.serializers.ValidationError, RequestException) as e:
            logger.warning(
                "Cannot fetch system actor for domain %s: %s", domain.name, str(e)
            )
    
        domain.nodeinfo_fetch_date = now
        domain.nodeinfo = nodeinfo
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
    
    
    
    def delete_qs(qs):
        label = qs.model._meta.label
        result = qs.delete()
        related = sum(result[1].values())
    
        logger.info(
            "Purged %s %s objects (and %s related entities)", result[0], label, related
        )
    
    
    
    def handle_purge_actors(ids, only=[]):
        """
        Empty only means we purge everything
        Otherwise, we purge only the requested bits: media
        """
    
        # purge follows (received emitted)
    
        if not only:
            delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
            delete_qs(models.Follow.objects.filter(actor_id__in=ids))
    
        if not only or "media" in only:
            delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
            delete_qs(models.Follow.objects.filter(target_id__in=ids))
            delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
            delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
    
    
        # purge remaining activities / deliveries
    
        if not only:
            delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
            delete_qs(models.Activity.objects.filter(actor_id__in=ids))
    
    
    
    @celery.app.task(name="federation.purge_actors")
    
    def purge_actors(ids=[], domains=[], only=[]):
    
        actors = models.Actor.objects.filter(
            Q(id__in=ids) | Q(domain_id__in=domains)
        ).order_by("id")
        found_ids = list(actors.values_list("id", flat=True))
        logger.info("Starting purging %s accounts", len(found_ids))
    
        handle_purge_actors(ids=found_ids, only=only)
    
    
    
    @celery.app.task(name="federation.rotate_actor_key")
    @celery.require_instance(models.Actor.objects.local(), "actor")
    def rotate_actor_key(actor):
        pair = keys.get_key_pair()
        actor.private_key = pair[0].decode()
        actor.public_key = pair[1].decode()
        actor.save(update_fields=["private_key", "public_key"])