Newer
Older
Eliot Berriot
committed
import os
from django.db.models import Q, F
from dynamic_preferences.registries import global_preferences_registry
from requests.exceptions import RequestException
from funkwhale_api.common import preferences
from funkwhale_api.common import session
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
from . import actors
from . import keys
logger = logging.getLogger(__name__)
def clean_music_cache():
preferences = global_preferences_registry.manager()
if delay < 1:
return # cache clearing disabled
Eliot Berriot
committed
limit = timezone.now() - datetime.timedelta(minutes=delay)
& (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
# library__actor__user=None,
Eliot Berriot
committed
)
Eliot Berriot
committed
# we also delete orphaned files, if any
storage = models.LibraryTrack._meta.get_field("audio_file").storage
files = get_files(storage, "federation_cache/tracks")
existing = music_models.Upload.objects.filter(audio_file__in=files)
missing = set(files) - set(existing.values_list("audio_file", flat=True))
Eliot Berriot
committed
for m in missing:
storage.delete(m)
def get_files(storage, *parts):
"""
This is a recursive function that return all files available
in a given directory using django's storage.
"""
if not parts:
try:
dirs, files = storage.listdir(os.path.join(*parts))
except FileNotFoundError:
return []
Eliot Berriot
committed
for dir in dirs:
files += get_files(storage, *(list(parts) + [dir]))
return [os.path.join(parts[-1], path) for path in files]
@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
Eliot Berriot
committed
def dispatch_inbox(activity, call_handlers=True):
"""
Given an activity instance, triggers our internal delivery logic (follow
creation, etc.)
"""
routes.inbox.dispatch(
activity.payload,
context={
"activity": activity,
"actor": activity.actor,
"inbox_items": activity.inbox_items.filter(is_read=False),
},
Eliot Berriot
committed
call_handlers=call_handlers,
@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
"""
Deliver a local activity to its recipients, both locally and remotely
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
Eliot Berriot
committed
dispatch_inbox.delay(activity_id=activity.pk, call_handlers=False)
if not preferences.get("federation__enabled"):
# federation is disabled, we only deliver to local recipients
return
deliveries = activity.deliveries.filter(is_delivered=False)
for id in deliveries.values_list("pk", flat=True):
deliver_to_remote.delay(delivery_id=id)
@celery.app.task(
name="federation.deliver_to_remote_inbox",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(
models.Delivery.objects.filter(is_delivered=False).select_related(
"activity__actor"
),
"delivery",
)
def deliver_to_remote(delivery):
if not preferences.get("federation__enabled"):
# federation is disabled, we only deliver to local recipients
return
actor = delivery.activity.actor
logger.info("Preparing activity delivery to %s", delivery.inbox_url)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try:
response = session.get_session().post(
auth=auth,
json=delivery.activity.payload,
url=delivery.inbox_url,
timeout=5,
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
headers={"Content-Type": "application/activity+json"},
)
logger.debug("Remote answered with %s", response.status_code)
response.raise_for_status()
except Exception:
delivery.last_attempt_date = timezone.now()
delivery.attempts = F("attempts") + 1
delivery.save(update_fields=["last_attempt_date", "attempts"])
delivery.last_attempt_date = timezone.now()
delivery.attempts = F("attempts") + 1
delivery.is_delivered = True
delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def fetch_nodeinfo(domain_name):
s = session.get_session()
wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
response = s.get(
url=wellknown_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
)
response.raise_for_status()
serializer = serializers.NodeInfoSerializer(data=response.json())
serializer.is_valid(raise_exception=True)
nodeinfo_url = None
for link in serializer.validated_data["links"]:
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
nodeinfo_url = link["href"]
break
response = s.get(
url=nodeinfo_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
)
response.raise_for_status()
return response.json()
@celery.app.task(name="federation.update_domain_nodeinfo")
@celery.require_instance(
models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
)
def update_domain_nodeinfo(domain):
now = timezone.now()
try:
nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
except (requests.RequestException, serializers.serializers.ValidationError) as e:
nodeinfo = {"status": "error", "error": str(e)}
service_actor_id = common_utils.recursive_getattr(
nodeinfo, "payload.metadata.actorId", permissive=True
)
try:
domain.service_actor = (
utils.retrieve_ap_object(
service_actor_id,
actor=actors.get_service_actor(),
queryset=models.Actor,
serializer_class=serializers.ActorSerializer,
)
if service_actor_id
else None
)
except (serializers.serializers.ValidationError, RequestException) as e:
logger.warning(
"Cannot fetch system actor for domain %s: %s", domain.name, str(e)
)
domain.nodeinfo_fetch_date = now
domain.nodeinfo = nodeinfo
domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date", "service_actor"])
Eliot Berriot
committed
@celery.app.task(name="federation.refresh_nodeinfo_known_nodes")
def refresh_nodeinfo_known_nodes():
"""
Trigger a node info refresh on all nodes that weren't refreshed since
settings.NODEINFO_REFRESH_DELAY
"""
limit = timezone.now() - datetime.timedelta(seconds=settings.NODEINFO_REFRESH_DELAY)
candidates = models.Domain.objects.external().exclude(
nodeinfo_fetch_date__gte=limit
)
names = candidates.values_list("name", flat=True)
logger.info("Launching periodic nodeinfo refresh on %s domains", len(names))
for domain_name in names:
update_domain_nodeinfo.delay(domain_name=domain_name)
def delete_qs(qs):
label = qs.model._meta.label
result = qs.delete()
related = sum(result[1].values())
logger.info(
"Purged %s %s objects (and %s related entities)", result[0], label, related
)
def handle_purge_actors(ids, only=[]):
"""
Empty only means we purge everything
Otherwise, we purge only the requested bits: media
"""
# purge follows (received emitted)
if not only:
delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
delete_qs(models.Follow.objects.filter(actor_id__in=ids))
# purge audio content
if not only or "media" in only:
delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
delete_qs(models.Follow.objects.filter(target_id__in=ids))
delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
# purge remaining activities / deliveries
if not only:
delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
delete_qs(models.Activity.objects.filter(actor_id__in=ids))
@celery.app.task(name="federation.purge_actors")
def purge_actors(ids=[], domains=[], only=[]):
actors = models.Actor.objects.filter(
Q(id__in=ids) | Q(domain_id__in=domains)
).order_by("id")
found_ids = list(actors.values_list("id", flat=True))
logger.info("Starting purging %s accounts", len(found_ids))
handle_purge_actors(ids=found_ids, only=only)
@celery.app.task(name="federation.rotate_actor_key")
@celery.require_instance(models.Actor.objects.local(), "actor")
def rotate_actor_key(actor):
pair = keys.get_key_pair()
actor.private_key = pair[0].decode()
actor.public_key = pair[1].decode()
actor.save(update_fields=["private_key", "public_key"])