Newer
Older
Eliot Berriot
committed
import os
from django.db.models import Q, F
from dynamic_preferences.registries import global_preferences_registry
from requests.exceptions import RequestException
from funkwhale_api.common import preferences
from funkwhale_api.common import session
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
logger = logging.getLogger(__name__)
def clean_music_cache():
preferences = global_preferences_registry.manager()
if delay < 1:
return # cache clearing disabled
Eliot Berriot
committed
limit = timezone.now() - datetime.timedelta(minutes=delay)
& (Q(accessed_date__lt=limit) | Q(accessed_date=None)),
# library__actor__user=None,
Eliot Berriot
committed
)
Eliot Berriot
committed
# we also delete orphaned files, if any
storage = models.LibraryTrack._meta.get_field("audio_file").storage
files = get_files(storage, "federation_cache/tracks")
existing = music_models.Upload.objects.filter(audio_file__in=files)
missing = set(files) - set(existing.values_list("audio_file", flat=True))
Eliot Berriot
committed
for m in missing:
storage.delete(m)
def get_files(storage, *parts):
"""
This is a recursive function that return all files available
in a given directory using django's storage.
"""
if not parts:
try:
dirs, files = storage.listdir(os.path.join(*parts))
except FileNotFoundError:
return []
Eliot Berriot
committed
for dir in dirs:
files += get_files(storage, *(list(parts) + [dir]))
return [os.path.join(parts[-1], path) for path in files]
@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_inbox(activity):
"""
Given an activity instance, triggers our internal delivery logic (follow
creation, etc.)
"""
routes.inbox.dispatch(
activity.payload,
context={
"activity": activity,
"actor": activity.actor,
"inbox_items": activity.inbox_items.filter(is_read=False),
},
)
@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
"""
Deliver a local activity to its recipients, both locally and remotely
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
if inbox_items.exists():
dispatch_inbox.delay(activity_id=activity.pk)
if not preferences.get("federation__enabled"):
# federation is disabled, we only deliver to local recipients
return
deliveries = activity.deliveries.filter(is_delivered=False)
for id in deliveries.values_list("pk", flat=True):
deliver_to_remote.delay(delivery_id=id)
@celery.app.task(
name="federation.deliver_to_remote_inbox",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(
models.Delivery.objects.filter(is_delivered=False).select_related(
"activity__actor"
),
"delivery",
)
def deliver_to_remote(delivery):
if not preferences.get("federation__enabled"):
# federation is disabled, we only deliver to local recipients
return
actor = delivery.activity.actor
logger.info("Preparing activity delivery to %s", delivery.inbox_url)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try:
response = session.get_session().post(
auth=auth,
json=delivery.activity.payload,
url=delivery.inbox_url,
timeout=5,
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
headers={"Content-Type": "application/activity+json"},
)
logger.debug("Remote answered with %s", response.status_code)
response.raise_for_status()
except Exception:
delivery.last_attempt_date = timezone.now()
delivery.attempts = F("attempts") + 1
delivery.save(update_fields=["last_attempt_date", "attempts"])
delivery.last_attempt_date = timezone.now()
delivery.attempts = F("attempts") + 1
delivery.is_delivered = True
delivery.save(update_fields=["last_attempt_date", "attempts", "is_delivered"])
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def fetch_nodeinfo(domain_name):
s = session.get_session()
wellknown_url = "https://{}/.well-known/nodeinfo".format(domain_name)
response = s.get(
url=wellknown_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
)
response.raise_for_status()
serializer = serializers.NodeInfoSerializer(data=response.json())
serializer.is_valid(raise_exception=True)
nodeinfo_url = None
for link in serializer.validated_data["links"]:
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
nodeinfo_url = link["href"]
break
response = s.get(
url=nodeinfo_url, timeout=5, verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL
)
response.raise_for_status()
return response.json()
@celery.app.task(name="federation.update_domain_nodeinfo")
@celery.require_instance(
models.Domain.objects.external(), "domain", id_kwarg_name="domain_name"
)
def update_domain_nodeinfo(domain):
now = timezone.now()
try:
nodeinfo = {"status": "ok", "payload": fetch_nodeinfo(domain.name)}
except (requests.RequestException, serializers.serializers.ValidationError) as e:
nodeinfo = {"status": "error", "error": str(e)}
domain.nodeinfo_fetch_date = now
domain.nodeinfo = nodeinfo
domain.save(update_fields=["nodeinfo", "nodeinfo_fetch_date"])
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def delete_qs(qs):
label = qs.model._meta.label
result = qs.delete()
related = sum(result[1].values())
logger.info(
"Purged %s %s objects (and %s related entities)", result[0], label, related
)
def handle_purge_actors(ids):
# purge follows (received emitted)
delete_qs(models.LibraryFollow.objects.filter(target__actor_id__in=ids))
delete_qs(models.LibraryFollow.objects.filter(actor_id__in=ids))
delete_qs(models.Follow.objects.filter(target_id__in=ids))
delete_qs(models.Follow.objects.filter(actor_id__in=ids))
# purge audio content
delete_qs(music_models.Upload.objects.filter(library__actor_id__in=ids))
delete_qs(music_models.Library.objects.filter(actor_id__in=ids))
# purge remaining activities / deliveries
delete_qs(models.InboxItem.objects.filter(actor_id__in=ids))
delete_qs(models.Activity.objects.filter(actor_id__in=ids))
@celery.app.task(name="federation.purge_actors")
def purge_actors(ids=[], domains=[]):
actors = models.Actor.objects.filter(
Q(id__in=ids) | Q(domain_id__in=domains)
).order_by("id")
found_ids = list(actors.values_list("id", flat=True))
logger.info("Starting purging %s accounts", len(found_ids))
handle_purge_actors(ids=found_ids)