Skip to content
Snippets Groups Projects
tasks.py 20.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • import datetime
    
    import json
    
    import logging
    
    import requests
    
    
    from django.conf import settings
    
    from django.db import transaction
    
    from django.db.models import Q, F
    
    from django.db.models.deletion import Collector
    
    from django.utils import timezone
    
    from dynamic_preferences.registries import global_preferences_registry
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from requests.exceptions import RequestException
    
    from funkwhale_api.audio import models as audio_models
    
    from funkwhale_api.common import preferences
    
    from funkwhale_api.common import models as common_models
    
    from funkwhale_api.common import session
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from funkwhale_api.common import utils as common_utils
    
    from funkwhale_api.moderation import mrf
    
    from funkwhale_api.music import models as music_models
    
    from funkwhale_api.taskapp import celery
    
    
    from . import activity
    
    Agate's avatar
    Agate committed
    from . import exceptions
    
    from . import jsonld
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from . import models, signing
    
    from . import serializers
    
    from . import routes
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from . import utils
    
    from . import webfinger
    
    
    logger = logging.getLogger(__name__)
    
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    @celery.app.task(name="federation.clean_music_cache")
    
    def clean_music_cache():
        preferences = global_preferences_registry.manager()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        delay = preferences["federation__music_cache_duration"]
    
        if delay < 1:
            return  # cache clearing disabled
    
        limit = timezone.now() - datetime.timedelta(minutes=delay)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        candidates = (
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            music_models.Upload.objects.filter(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                Q(audio_file__isnull=False)
    
                & (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
                # library__actor__user=None,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            .exclude(audio_file="")
    
            .filter(Q(source__startswith="http://") | Q(source__startswith="https://"))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            .only("audio_file", "id")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        for upload in candidates:
            upload.audio_file.delete()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        storage = models.LibraryTrack._meta.get_field("audio_file").storage
    
        files = get_files(storage, "federation_cache/tracks")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        existing = music_models.Upload.objects.filter(audio_file__in=files)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        missing = set(files) - set(existing.values_list("audio_file", flat=True))
    
        for m in missing:
            storage.delete(m)
    
    
    def get_files(storage, *parts):
        """
        This is a recursive function that return all files available
        in a given directory using django's storage.
        """
        if not parts:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            raise ValueError("Missing path")
    
        try:
            dirs, files = storage.listdir(os.path.join(*parts))
        except FileNotFoundError:
            return []
    
        for dir in dirs:
            files += get_files(storage, *(list(parts) + [dir]))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        return [os.path.join(parts[-1], path) for path in files]
    
    
    
    @celery.app.task(name="federation.dispatch_inbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    
    def dispatch_inbox(activity, call_handlers=True):
    
        """
        Given an activity instance, triggers our internal delivery logic (follow
        creation, etc.)
        """
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        routes.inbox.dispatch(
            activity.payload,
            context={
                "activity": activity,
                "actor": activity.actor,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "inbox_items": activity.inbox_items.filter(is_read=False).order_by("id"),
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            },
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        )
    
    
    
    @celery.app.task(name="federation.dispatch_outbox")
    @celery.require_instance(models.Activity.objects.select_related(), "activity")
    def dispatch_outbox(activity):
        """
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        Deliver a local activity to its recipients, both locally and remotely
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        inbox_items = activity.inbox_items.filter(is_read=False).select_related()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        if inbox_items.exists():
    
            call_handlers = activity.type in ["Follow"]
            dispatch_inbox.delay(activity_id=activity.pk, call_handlers=call_handlers)
    
        if not preferences.get("federation__enabled"):
            # federation is disabled, we only deliver to local recipients
            return
    
        deliveries = activity.deliveries.filter(is_delivered=False)
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        for id in deliveries.values_list("pk", flat=True):
            deliver_to_remote.delay(delivery_id=id)
    
    
    
    @celery.app.task(
        name="federation.deliver_to_remote_inbox",
        autoretry_for=[RequestException],
        retry_backoff=30,
        max_retries=5,
    )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    @celery.require_instance(
        models.Delivery.objects.filter(is_delivered=False).select_related(
            "activity__actor"
        ),
        "delivery",
    )
    def deliver_to_remote(delivery):
    
    
        if not preferences.get("federation__enabled"):
            # federation is disabled, we only deliver to local recipients
            return
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        actor = delivery.activity.actor
        logger.info("Preparing activity delivery to %s", delivery.inbox_url)
    
        auth = signing.get_auth(actor.private_key, actor.private_key_id)
        try:
            response = session.get_session().post(
                auth=auth,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                json=delivery.activity.payload,
                url=delivery.inbox_url,
    
                headers={"Content-Type": "application/activity+json"},
            )
            logger.debug("Remote answered with %s", response.status_code)
            response.raise_for_status()
        except Exception:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            delivery.last_attempt_date = timezone.now()
            delivery.attempts = F("attempts") + 1
            delivery.save(update_fields=["last_attempt_date", "attempts"])
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            delivery.last_attempt_date = timezone.now()
            delivery.attempts = F("attempts") + 1
            delivery.is_delivered = True
            delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
    
    
    
    def fetch_nodeinfo(domain_name):
        s = session.get_session()
        wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        response = s.get(url=wellknown_url)
    
        response.raise_for_status()
        serializer = serializers.NodeInfoSerializer(data=response.json())
        serializer.is_valid(raise_exception=True)
        nodeinfo_url = None
        for link in serializer.validated_data["links"]:
            if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
                nodeinfo_url = link["href"]
                break
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        response = s.get(url=nodeinfo_url)
    
        response.raise_for_status()
        return response.json()
    
    
    @celery.app.task(name="federation.update_domain_nodeinfo")
    @celery.require_instance(
        models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
    )
    def update_domain_nodeinfo(domain):
        now = timezone.now()
        try:
            nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        except (
            requests.RequestException,
            serializers.serializers.ValidationError,
            ValueError,
        ) as e:
    
            nodeinfo = {"status": "error", "error": str(e)}
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
        service_actor_id = common_utils.recursive_getattr(
            nodeinfo, "payload.metadata.actorId", permissive=True
        )
        try:
            domain.service_actor = (
                utils.retrieve_ap_object(
                    service_actor_id,
    
                    actor=actors.get_service_actor(),
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    queryset=models.Actor,
                    serializer_class=serializers.ActorSerializer,
                )
                if service_actor_id
                else None
            )
    
    Agate's avatar
    Agate committed
        except (
            serializers.serializers.ValidationError,
            RequestException,
            exceptions.BlockedActorOrDomain,
        ) as e:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            logger.warning(
                "Cannot fetch system actor for domain %s: %s", domain.name, str(e)
            )
    
        domain.nodeinfo_fetch_date = now
        domain.nodeinfo = nodeinfo
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
    
    @celery.app.task(name="federation.refresh_nodeinfo_known_nodes")
    def refresh_nodeinfo_known_nodes():
        """
        Trigger a node info refresh on all nodes that weren't refreshed since
        settings.NODEINFO_REFRESH_DELAY
        """
        limit = timezone.now() - datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY)
        candidates = models.Domain.objects.external().exclude(
            nodeinfo_fetch_date__gte=limit
        )
        names = candidates.values_list("name", flat=True)
        logger.info("Launching periodic nodeinfo refresh on %s domains", len(names))
        for domain_name in names:
            update_domain_nodeinfo.delay(domain_name=domain_name)
    
    
    
    def delete_qs(qs):
        label = qs.model._meta.label
        result = qs.delete()
        related = sum(result[1].values())
    
        logger.info(
            "Purged %s %s objects (and %s related entities)", result[0], label, related
        )
    
    
    
    def handle_purge_actors(ids, only=[]):
        """
        Empty only means we purge everything
        Otherwise, we purge only the requested bits: media
        """
    
        # purge follows (received emitted)
    
        if not only:
            delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
            delete_qs(models.Follow.objects.filter(actor_id__in=ids))
    
            delete_qs(common_models.Attachment.objects.filter(actor__in=ids))
    
            delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
            delete_qs(models.Follow.objects.filter(target_id__in=ids))
    
            delete_qs(audio_models.Channel.objects.filter(attributed_to__in=ids))
            delete_qs(audio_models.Channel.objects.filter(actor__in=ids))
    
            delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
            delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
    
    
        # purge remaining activities / deliveries
    
        if not only:
            delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
            delete_qs(models.Activity.objects.filter(actor_id__in=ids))
    
    
    
    @celery.app.task(name="federation.purge_actors")
    
    def purge_actors(ids=[], domains=[], only=[]):
    
        actors = models.Actor.objects.filter(
            Q(id__in=ids) | Q(domain_id__in=domains)
        ).order_by("id")
        found_ids = list(actors.values_list("id", flat=True))
        logger.info("Starting purging %s accounts", len(found_ids))
    
        handle_purge_actors(ids=found_ids, only=only)
    
    
    
    @celery.app.task(name="federation.rotate_actor_key")
    @celery.require_instance(models.Actor.objects.local(), "actor")
    def rotate_actor_key(actor):
        pair = keys.get_key_pair()
        actor.private_key = pair[0].decode()
        actor.public_key = pair[1].decode()
        actor.save(update_fields=["private_key", "public_key"])
    
    
    
    @celery.app.task(name="federation.fetch")
    @transaction.atomic
    @celery.require_instance(
    
        models.Fetch.objects.filter(status="pending").select_related("actor"),
        "fetch_obj",
        "fetch_id",
    
    def fetch(fetch_obj):
    
        def error(code, **kwargs):
    
            fetch_obj.status = "errored"
            fetch_obj.fetch_date = timezone.now()
            fetch_obj.detail = {"error_code": code}
            fetch_obj.detail.update(kwargs)
            fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
    
        url = fetch_obj.url
        mrf_check_url = url
        if not mrf_check_url.startswith("webfinger://"):
            payload, updated = mrf.inbox.apply({"id": mrf_check_url})
            if not payload:
                return error("blocked", message="Blocked by MRF")
    
        actor = fetch_obj.actor
        if settings.FEDERATION_AUTHENTIFY_FETCHES:
            auth = signing.get_auth(actor.private_key, actor.private_key_id)
        else:
            auth = None
    
            if url.startswith("webfinger://"):
                # we first grab the correpsonding webfinger representation
                # to get the ActivityPub actor ID
                webfinger_data = webfinger.get_resource(
                    "acct:" + url.replace("webfinger://", "")
                )
                url = webfinger.get_ap_url(webfinger_data["links"])
                if not url:
                    return error("webfinger", message="Invalid or missing webfinger data")
                payload, updated = mrf.inbox.apply({"id": url})
                if not payload:
                    return error("blocked", message="Blocked by MRF")
    
            response = session.get_session().get(
    
                auth=auth, url=url, headers={"Accept": "application/activity+json"},
    
    Agate's avatar
    Agate committed
            logger.debug("Remote answered with %s: %s", response.status_code, response.text)
    
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
    
    Agate's avatar
    Agate committed
            return error(
                "http",
                status_code=e.response.status_code if e.response else None,
                message=response.text,
            )
    
        except requests.exceptions.Timeout:
            return error("timeout")
        except requests.exceptions.ConnectionError as e:
            return error("connection", message=str(e))
        except requests.RequestException as e:
            return error("request", message=str(e))
        except Exception as e:
            return error("unhandled", message=str(e))
    
        try:
            payload = response.json()
        except json.decoder.JSONDecodeError:
    
            # we attempt to extract a <link rel=alternate> that points
            # to an activity pub resource, if possible, and retry with this URL
            alternate_url = utils.find_alternate(response.text)
            if alternate_url:
                fetch_obj.url = alternate_url
                fetch_obj.save(update_fields=["url"])
                return fetch(fetch_id=fetch_obj.pk)
    
            return error("invalid_json")
    
    
        payload, updated = mrf.inbox.apply(payload)
        if not payload:
            return error("blocked", message="Blocked by MRF")
    
    
        try:
            doc = jsonld.expand(payload)
        except ValueError:
            return error("invalid_jsonld")
    
        try:
            type = doc.get("@type", [])[0]
        except IndexError:
            return error("missing_jsonld_type")
        try:
    
            serializer_classes = fetch_obj.serializers[type]
            model = serializer_classes[0].Meta.model
    
        except (KeyError, AttributeError):
    
            fetch_obj.status = "skipped"
            fetch_obj.fetch_date = timezone.now()
            fetch_obj.detail = {"reason": "unhandled_type", "type": type}
            return fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
    
        try:
            id = doc.get("@id")
        except IndexError:
            existing = None
        else:
            existing = model.objects.filter(fid=id).first()
    
    
        serializer = None
        for serializer_class in serializer_classes:
    
            serializer = serializer_class(
                existing, data=payload, context={"fetch_actor": actor}
            )
    
            if not serializer.is_valid():
                continue
            else:
                break
        if serializer.errors:
    
            return error("validation", validation_errors=serializer.errors)
        try:
    
            obj = serializer.save()
    
        except Exception as e:
            error("save", message=str(e))
            raise
    
    
        # special case for channels
        # when obj is an actor, we check if the actor has a channel associated with it
        # if it is the case, we consider the fetch obj to be a channel instead
    
        # and also trigger a fetch on the channel outbox
    
        if isinstance(obj, models.Actor) and obj.get_channel():
            obj = obj.get_channel()
    
            if obj.actor.outbox_url:
    
                try:
                    # first page fetch is synchronous, so that at least some data is available
                    # in the UI after subscription
                    result = fetch_collection(
                        obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
                    )
                except Exception:
                    logger.exception(
    
                        "Error while fetching actor outbox: %s", obj.actor.outbox_url
    
                else:
                    if result.get("next_page"):
                        # additional pages are fetched in the background
                        result = fetch_collection.delay(
                            result["next_page"],
                            channel_id=obj.pk,
                            max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
                            is_page=True,
                        )
    
        fetch_obj.object = obj
        fetch_obj.status = "finished"
        fetch_obj.fetch_date = timezone.now()
        return fetch_obj.save(
            update_fields=["fetch_date", "status", "object_id", "object_content_type"]
        )
    
    
    
    class PreserveSomeDataCollector(Collector):
        """
        We need to delete everything related to an actor. Well… Almost everything.
        But definitely not the Delete Activity we send to announce the actor is deleted.
        """
    
        def __init__(self, *args, **kwargs):
            self.creation_date = timezone.now()
            super().__init__(*args, **kwargs)
    
        def related_objects(self, related, *args, **kwargs):
            qs = super().related_objects(related, *args, **kwargs)
            if related.name == "outbox_activities":
                # exclude the delete activity can be broadcasted properly
                qs = qs.exclude(type="Delete", creation_date__gte=self.creation_date)
    
            return qs
    
    
    @celery.app.task(name="federation.remove_actor")
    @transaction.atomic
    @celery.require_instance(
        models.Actor.objects.all(), "actor",
    )
    def remove_actor(actor):
        # Then we broadcast the info over federation. We do this *before* deleting objects
        # associated with the actor, otherwise follows are removed and we don't know where
        # to broadcast
        logger.info("Broadcasting deletion to federation…")
        collector = PreserveSomeDataCollector(using="default")
        routes.outbox.dispatch(
            {"type": "Delete", "object": {"type": actor.type}}, context={"actor": actor}
        )
    
        # then we delete any object associated with the actor object, but *not* the actor
        # itself. We keep it for auditability and sending the Delete ActivityPub message
        logger.info(
            "Prepare deletion of objects associated with account %s…",
            actor.preferred_username,
        )
        collector.collect([actor])
        for model, instances in collector.data.items():
            if issubclass(model, actor.__class__):
                # we skip deletion of the actor itself
                continue
    
            to_delete = model.objects.filter(pk__in=[instance.pk for instance in instances])
            logger.info(
                "Deleting %s objects associated with account %s…",
                len(instances),
                actor.preferred_username,
            )
            to_delete.delete()
    
        # Finally, we update the actor itself and mark it as removed
        logger.info("Marking actor as Tombsone…")
        actor.type = "Tombstone"
        actor.name = None
        actor.summary = None
        actor.save(update_fields=["type", "name", "summary"])
    
    
    
    COLLECTION_ACTIVITY_SERIALIZERS = [
        (
            {"type": "Create", "object.type": "Audio"},
            serializers.ChannelCreateUploadSerializer,
        )
    ]
    
    
    def match_serializer(payload, conf):
        return [
            serializer_class
            for route, serializer_class in conf
            if activity.match_route(route, payload)
        ]
    
    
    @celery.app.task(name="federation.fetch_collection")
    @celery.require_instance(
        audio_models.Channel.objects.all(), "channel", allow_null=True,
    )
    def fetch_collection(url, max_pages, channel, is_page=False):
        actor = actors.get_service_actor()
        results = {
            "items": [],
            "skipped": 0,
            "errored": 0,
            "seen": 0,
            "total": 0,
        }
        if is_page:
            # starting immediatly from a page, no need to fetch the wrapping collection
            logger.debug("Fetch collection page immediatly at %s", url)
            results["next_page"] = url
        else:
            logger.debug("Fetching collection object at %s", url)
            collection = utils.retrieve_ap_object(
                url,
                actor=actor,
                serializer_class=serializers.PaginatedCollectionSerializer,
            )
            results["next_page"] = collection["first"]
            results["total"] = collection.get("totalItems")
    
        seen_pages = 0
        context = {}
        if channel:
            context["channel"] = channel
    
        for i in range(max_pages):
            page_url = results["next_page"]
            logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url)
            page = utils.retrieve_ap_object(page_url, actor=actor, serializer_class=None,)
            try:
                items = page["orderedItems"]
            except KeyError:
                try:
                    items = page["items"]
                except KeyError:
                    logger.error("Invalid collection page at %s", page_url)
                    break
    
            for item in items:
                results["seen"] += 1
    
                matching_serializer = match_serializer(
                    item, COLLECTION_ACTIVITY_SERIALIZERS
                )
                if not matching_serializer:
                    results["skipped"] += 1
                    logger.debug("Skipping unhandled activity %s", item.get("type"))
                    continue
    
                s = matching_serializer[0](data=item, context=context)
                if not s.is_valid():
                    logger.warn("Skipping invalid activity: %s", s.errors)
                    results["errored"] += 1
                    continue
    
                results["items"].append(s.save())
    
            seen_pages += 1
            results["next_page"] = page.get("next", None) or None
            if not results["next_page"]:
                logger.debug("No more pages to fetch")
                break
    
        logger.info(
            "Finished fetch of collection pages at %s. Results:\n"
            "  Total in collection: %s\n"
            "  Seen: %s\n"
            "  Handled: %s\n"
            "  Skipped: %s\n"
            "  Errored: %s",
            url,
            results.get("total"),
            results["seen"],
            len(results["items"]),
            results["skipped"],
            results["errored"],
        )
        return results