Skip to content
Snippets Groups Projects
tasks.py 9.39 KiB
Newer Older
import datetime
import logging
import requests

from django.conf import settings
from django.db.models import Q, F
from django.utils import timezone
from dynamic_preferences.registries import global_preferences_registry
Eliot Berriot's avatar
Eliot Berriot committed
from requests.exceptions import RequestException
from funkwhale_api.common import preferences
from funkwhale_api.common import session
Eliot Berriot's avatar
Eliot Berriot committed
from funkwhale_api.common import utils as common_utils
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery

Eliot Berriot's avatar
Eliot Berriot committed
from . import models, signing
from . import serializers
from . import routes
Eliot Berriot's avatar
Eliot Berriot committed
from . import utils

logger = logging.getLogger(__name__)


Eliot Berriot's avatar
Eliot Berriot committed
@celery.app.task(name="federation.clean_music_cache")
def clean_music_cache():
    preferences = global_preferences_registry.manager()
Eliot Berriot's avatar
Eliot Berriot committed
    delay = preferences["federation__music_cache_duration"]
    if delay < 1:
        return  # cache clearing disabled
    limit = timezone.now() - datetime.timedelta(minutes=delay)
Eliot Berriot's avatar
Eliot Berriot committed
    candidates = (
Eliot Berriot's avatar
Eliot Berriot committed
        music_models.Upload.objects.filter(
Eliot Berriot's avatar
Eliot Berriot committed
            Q(audio_file__isnull=False)
            & (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
            # library__actor__user=None,
Eliot Berriot's avatar
Eliot Berriot committed
        .exclude(audio_file="")
        .only("audio_file", "id")
Eliot Berriot's avatar
Eliot Berriot committed
    )
Eliot Berriot's avatar
Eliot Berriot committed
    for upload in candidates:
        upload.audio_file.delete()
Eliot Berriot's avatar
Eliot Berriot committed
    storage = models.LibraryTrack._meta.get_field("audio_file").storage
    files = get_files(storage, "federation_cache/tracks")
Eliot Berriot's avatar
Eliot Berriot committed
    existing = music_models.Upload.objects.filter(audio_file__in=files)
Eliot Berriot's avatar
Eliot Berriot committed
    missing = set(files) - set(existing.values_list("audio_file", flat=True))
    for m in missing:
        storage.delete(m)


def get_files(storage, *parts):
    """
    This is a recursive function that return all files available
    in a given directory using django's storage.
    """
    if not parts:
Eliot Berriot's avatar
Eliot Berriot committed
        raise ValueError("Missing path")
    try:
        dirs, files = storage.listdir(os.path.join(*parts))
    except FileNotFoundError:
        return []
    for dir in dirs:
        files += get_files(storage, *(list(parts) + [dir]))
Eliot Berriot's avatar
Eliot Berriot committed
    return [os.path.join(parts[-1], path) for path in files]


@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_inbox(activity, call_handlers=True):
    """
    Given an activity instance, triggers our internal delivery logic (follow
    creation, etc.)
    """

Eliot Berriot's avatar
Eliot Berriot committed
    routes.inbox.dispatch(
        activity.payload,
        context={
            "activity": activity,
            "actor": activity.actor,
            "inbox_items": activity.inbox_items.filter(is_read=False),
        },
Eliot Berriot's avatar
Eliot Berriot committed
    )


@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
    """
Eliot Berriot's avatar
Eliot Berriot committed
    Deliver a local activity to its recipients, both locally and remotely
Eliot Berriot's avatar
Eliot Berriot committed
    inbox_items = activity.inbox_items.filter(is_read=False).select_related()
Eliot Berriot's avatar
Eliot Berriot committed
    if inbox_items.exists():
        dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
    if not preferences.get("federation__enabled"):
        # federation is disabled, we only deliver to local recipients
        return

    deliveries = activity.deliveries.filter(is_delivered=False)

Eliot Berriot's avatar
Eliot Berriot committed
    for id in deliveries.values_list("pk", flat=True):
        deliver_to_remote.delay(delivery_id=id)


@celery.app.task(
    name="federation.deliver_to_remote_inbox",
    autoretry_for=[RequestException],
    retry_backoff=30,
    max_retries=5,
)
Eliot Berriot's avatar
Eliot Berriot committed
@celery.require_instance(
    models.Delivery.objects.filter(is_delivered=False).select_related(
        "activity__actor"
    ),
    "delivery",
)
def deliver_to_remote(delivery):

    if not preferences.get("federation__enabled"):
        # federation is disabled, we only deliver to local recipients
        return

Eliot Berriot's avatar
Eliot Berriot committed
    actor = delivery.activity.actor
    logger.info("Preparing activity delivery to %s", delivery.inbox_url)
    auth = signing.get_auth(actor.private_key, actor.private_key_id)
    try:
        response = session.get_session().post(
            auth=auth,
Eliot Berriot's avatar
Eliot Berriot committed
            json=delivery.activity.payload,
            url=delivery.inbox_url,
            timeout=5,
            verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
            headers={"Content-Type": "application/activity+json"},
        )
        logger.debug("Remote answered with %s", response.status_code)
        response.raise_for_status()
    except Exception:
Eliot Berriot's avatar
Eliot Berriot committed
        delivery.last_attempt_date = timezone.now()
        delivery.attempts = F("attempts") + 1
        delivery.save(update_fields=["last_attempt_date", "attempts"])
Eliot Berriot's avatar
Eliot Berriot committed
        delivery.last_attempt_date = timezone.now()
        delivery.attempts = F("attempts") + 1
        delivery.is_delivered = True
        delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])


def fetch_nodeinfo(domain_name):
    s = session.get_session()
    wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
    response = s.get(
        url=wellknown_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
    )
    response.raise_for_status()
    serializer = serializers.NodeInfoSerializer(data=response.json())
    serializer.is_valid(raise_exception=True)
    nodeinfo_url = None
    for link in serializer.validated_data["links"]:
        if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
            nodeinfo_url = link["href"]
            break

    response = s.get(
        url=nodeinfo_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
    )
    response.raise_for_status()
    return response.json()


@celery.app.task(name="federation.update_domain_nodeinfo")
@celery.require_instance(
    models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
)
def update_domain_nodeinfo(domain):
    now = timezone.now()
    try:
        nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
    except (requests.RequestException, serializers.serializers.ValidationError) as e:
        nodeinfo = {"status": "error", "error": str(e)}
Eliot Berriot's avatar
Eliot Berriot committed

    service_actor_id = common_utils.recursive_getattr(
        nodeinfo, "payload.metadata.actorId", permissive=True
    )
    try:
        domain.service_actor = (
            utils.retrieve_ap_object(
                service_actor_id,
                actor=actors.get_service_actor(),
Eliot Berriot's avatar
Eliot Berriot committed
                queryset=models.Actor,
                serializer_class=serializers.ActorSerializer,
            )
            if service_actor_id
            else None
        )
    except (serializers.serializers.ValidationError, RequestException) as e:
        logger.warning(
            "Cannot fetch system actor for domain %s: %s", domain.name, str(e)
        )
    domain.nodeinfo_fetch_date = now
    domain.nodeinfo = nodeinfo
Eliot Berriot's avatar
Eliot Berriot committed
    domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
@celery.app.task(name="federation.refresh_nodeinfo_known_nodes")
def refresh_nodeinfo_known_nodes():
    """
    Trigger a node info refresh on all nodes that weren't refreshed since
    settings.NODEINFO_REFRESH_DELAY
    """
    limit = timezone.now() - datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY)
    candidates = models.Domain.objects.external().exclude(
        nodeinfo_fetch_date__gte=limit
    )
    names = candidates.values_list("name", flat=True)
    logger.info("Launching periodic nodeinfo refresh on %s domains", len(names))
    for domain_name in names:
        update_domain_nodeinfo.delay(domain_name=domain_name)


def delete_qs(qs):
    label = qs.model._meta.label
    result = qs.delete()
    related = sum(result[1].values())

    logger.info(
        "Purged %s %s objects (and %s related entities)", result[0], label, related
    )


def handle_purge_actors(ids, only=[]):
    """
    Empty only means we purge everything
    Otherwise, we purge only the requested bits: media
    """
    # purge follows (received emitted)
    if not only:
        delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
        delete_qs(models.Follow.objects.filter(actor_id__in=ids))
    if not only or "media" in only:
        delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
        delete_qs(models.Follow.objects.filter(target_id__in=ids))
        delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
        delete_qs(music_models.Library.objects.filter(actor_id__in=ids))

    # purge remaining activities / deliveries
    if not only:
        delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
        delete_qs(models.Activity.objects.filter(actor_id__in=ids))


@celery.app.task(name="federation.purge_actors")
def purge_actors(ids=[], domains=[], only=[]):
    actors = models.Actor.objects.filter(
        Q(id__in=ids) | Q(domain_id__in=domains)
    ).order_by("id")
    found_ids = list(actors.values_list("id", flat=True))
    logger.info("Starting purging %s accounts", len(found_ids))
    handle_purge_actors(ids=found_ids, only=only)


@celery.app.task(name="federation.rotate_actor_key")
@celery.require_instance(models.Actor.objects.local(), "actor")
def rotate_actor_key(actor):
    pair = keys.get_key_pair()
    actor.private_key = pair[0].decode()
    actor.public_key = pair[1].decode()
    actor.save(update_fields=["private_key", "public_key"])