Newer
Older
Eliot Berriot
committed
import os
from django.db.models import Q, F
from dynamic_preferences.registries import global_preferences_registry
from funkwhale_api.common import session
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
logger = logging.getLogger(__name__)
def clean_music_cache():
preferences = global_preferences_registry.manager()
if delay < 1:
return # cache clearing disabled
Eliot Berriot
committed
limit = timezone.now() - datetime.timedelta(minutes=delay)
music_models.TrackFile.objects.filter(
& (Q(accessed_date__lt=limit) | Q(accessed_date=None))
Eliot Berriot
committed
)
for tf in candidates:
tf.audio_file.delete()
Eliot Berriot
committed
# we also delete orphaned files, if any
storage = models.LibraryTrack._meta.get_field("audio_file").storage
files = get_files(storage, "federation_cache/tracks")
existing = music_models.TrackFile.objects.filter(audio_file__in=files)
missing = set(files) - set(existing.values_list("audio_file", flat=True))
Eliot Berriot
committed
for m in missing:
storage.delete(m)
def get_files(storage, *parts):
"""
This is a recursive function that return all files available
in a given directory using django's storage.
"""
if not parts:
Eliot Berriot
committed
dirs, files = storage.listdir(os.path.join(*parts))
for dir in dirs:
files += get_files(storage, *(list(parts) + [dir]))
return [os.path.join(parts[-1], path) for path in files]
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@celery.app.task(name="federation.dispatch_inbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_inbox(activity):
"""
Given an activity instance, triggers our internal delivery logic (follow
creation, etc.)
"""
try:
routes.inbox.dispatch(
activity.payload,
context={
"actor": activity.actor,
"inbox_items": list(activity.inbox_items.local().select_related()),
},
)
except Exception:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
)
raise
else:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
is_delivered=True,
)
@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
"""
Deliver a local activity to its recipients
"""
inbox_items = activity.inbox_items.all().select_related("actor")
local_recipients_items = [ii for ii in inbox_items if ii.actor.is_local]
if local_recipients_items:
dispatch_inbox.delay(activity_id=activity.pk)
remote_recipients_items = [ii for ii in inbox_items if not ii.actor.is_local]
shared_inbox_urls = {
ii.actor.shared_inbox_url
for ii in remote_recipients_items
if ii.actor.shared_inbox_url
}
inbox_urls = {
ii.actor.inbox_url
for ii in remote_recipients_items
if not ii.actor.shared_inbox_url
}
for url in shared_inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, shared_inbox_url=url)
for url in inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, inbox_url=url)
@celery.app.task(
name="federation.deliver_to_remote_inbox",
autoretry_for=[RequestException],
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def deliver_to_remote_inbox(activity, inbox_url=None, shared_inbox_url=None):
url = inbox_url or shared_inbox_url
actor = activity.actor
inbox_items = activity.inbox_items.filter(is_delivered=False)
if inbox_url:
inbox_items = inbox_items.filter(actor__inbox_url=inbox_url)
else:
inbox_items = inbox_items.filter(actor__shared_inbox_url=shared_inbox_url)
logger.info("Preparing activity delivery to %s", url)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try:
response = session.get_session().post(
auth=auth,
json=activity.payload,
url=url,
timeout=5,
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
headers={"Content-Type": "application/activity+json"},
)
logger.debug("Remote answered with %s", response.status_code)
response.raise_for_status()
except Exception:
inbox_items.update(
last_delivery_date=timezone.now(),
delivery_attempts=F("delivery_attempts") + 1,
)
raise
else:
inbox_items.update(last_delivery_date=timezone.now(), is_delivered=True)