Commit e49a4602 authored by Eliot Berriot's avatar Eliot Berriot 💬

Audio federation

parent 6992c567
......@@ -249,6 +249,7 @@ Then, in separate terminals, you can setup as many different instances as you
need::
export COMPOSE_PROJECT_NAME=node2
export VUE_PORT=1234 # this has to be unique for each instance
docker-compose -f dev.yml run --rm api python manage.py migrate
docker-compose -f dev.yml run --rm api python manage.py createsuperuser
docker-compose -f dev.yml up nginx api front nginx api celeryworker
......
......@@ -14,7 +14,7 @@ router.register(r"settings", GlobalPreferencesViewSet, base_name="settings")
router.register(r"activity", activity_views.ActivityViewSet, "activity")
router.register(r"tags", views.TagViewSet, "tags")
router.register(r"tracks", views.TrackViewSet, "tracks")
router.register(r"track-files", views.TrackFileViewSet, "trackfiles")
router.register(r"uploads", views.UploadViewSet, "uploads")
router.register(r"libraries", views.LibraryViewSet, "libraries")
router.register(r"listen", views.ListenViewSet, "listen")
router.register(r"artists", views.ArtistViewSet, "artists")
......
......@@ -514,8 +514,14 @@ ACCOUNT_USERNAME_BLACKLIST = [
"me",
"ghost",
"_",
"-",
"hello",
"contact",
"inbox",
"outbox",
"shared-inbox",
"shared_inbox",
"actor",
] + env.list("ACCOUNT_USERNAME_BLACKLIST", default=[])
EXTERNAL_REQUESTS_VERIFY_SSL = env.bool("EXTERNAL_REQUESTS_VERIFY_SSL", default=True)
......
......@@ -9,7 +9,9 @@ from funkwhale_api.common import preferences
class ConditionalAuthentication(BasePermission):
def has_permission(self, request, view):
if preferences.get("common__api_authentication_required"):
return request.user and request.user.is_authenticated
return (request.user and request.user.is_authenticated) or (
hasattr(request, "actor") and request.actor
)
return True
......
......@@ -5,6 +5,12 @@ visibility.
Files without any import job will be bounded to a "default" library on the first
superuser account found. This should now happen though.
XXX TODO:
- add followers url on actor
- shared inbox url on actor
- compute hash from files
"""
from funkwhale_api.music import models
......@@ -19,7 +25,7 @@ def main(command, **kwargs):
command.stdout.write(
"* {} users imported music on this instance".format(len(importers))
)
files = models.TrackFile.objects.filter(
files = models.Upload.objects.filter(
library__isnull=True, jobs__isnull=False
).distinct()
command.stdout.write(
......@@ -39,7 +45,7 @@ def main(command, **kwargs):
)
user_files.update(library=library)
files = models.TrackFile.objects.filter(
files = models.Upload.objects.filter(
library__isnull=True, jobs__isnull=True
).distinct()
command.stdout.write(
......
......@@ -64,3 +64,46 @@ class ChunkedPath(object):
new_filename = "".join(chunks[3:]) + ".{}".format(ext)
parts = chunks[:3] + [new_filename]
return os.path.join(self.root, *parts)
def chunk_queryset(source_qs, chunk_size):
"""
From https://github.com/peopledoc/django-chunkator/blob/master/chunkator/__init__.py
"""
pk = None
# In django 1.9, _fields is always present and `None` if 'values()' is used
# In Django 1.8 and below, _fields will only be present if using `values()`
has_fields = hasattr(source_qs, "_fields") and source_qs._fields
if has_fields:
if "pk" not in source_qs._fields:
raise ValueError("The values() call must include the `pk` field")
field = source_qs.model._meta.pk
# set the correct field name:
# for ForeignKeys, we want to use `model_id` field, and not `model`,
# to bypass default ordering on related model
order_by_field = field.attname
source_qs = source_qs.order_by(order_by_field)
queryset = source_qs
while True:
if pk:
queryset = source_qs.filter(pk__gt=pk)
page = queryset[:chunk_size]
page = list(page)
nb_items = len(page)
if nb_items == 0:
return
last_item = page[-1]
# source_qs._fields exists *and* is not none when using "values()"
if has_fields:
pk = last_item["pk"]
else:
pk = last_item.pk
yield page
if nb_items < chunk_size:
return
This diff is collapsed.
......@@ -4,23 +4,21 @@ from . import models
from . import tasks
def redeliver_inbox_items(modeladmin, request, queryset):
for id in set(
queryset.filter(activity__actor__user__isnull=False).values_list(
"activity", flat=True
)
):
tasks.dispatch_outbox.delay(activity_id=id)
def redeliver_deliveries(modeladmin, request, queryset):
queryset.update(is_delivered=False)
for delivery in queryset:
tasks.deliver_to_remote.delay(delivery_id=delivery.pk)
redeliver_inbox_items.short_description = "Redeliver"
redeliver_deliveries.short_description = "Redeliver"
def redeliver_activities(modeladmin, request, queryset):
for id in set(
queryset.filter(actor__user__isnull=False).values_list("id", flat=True)
):
tasks.dispatch_outbox.delay(activity_id=id)
for activity in queryset.select_related("actor__user"):
if activity.actor.is_local:
tasks.dispatch_outbox.delay(activity_id=activity.pk)
else:
tasks.dispatch_inbox.delay(activity_id=activity.pk)
redeliver_activities.short_description = "Redeliver"
......@@ -67,14 +65,22 @@ class LibraryFollowAdmin(admin.ModelAdmin):
@admin.register(models.InboxItem)
class InboxItemAdmin(admin.ModelAdmin):
list_display = ["actor", "activity", "type", "is_read"]
list_filter = ["type", "activity__type", "is_read"]
search_fields = ["actor__fid", "activity__fid"]
list_select_related = True
@admin.register(models.Delivery)
class DeliveryAdmin(admin.ModelAdmin):
list_display = [
"actor",
"inbox_url",
"activity",
"type",
"last_delivery_date",
"delivery_attempts",
"last_attempt_date",
"attempts",
"is_delivered",
]
list_filter = ["type"]
search_fields = ["actor__fid", "activity__fid"]
list_filter = ["activity__type", "is_delivered"]
search_fields = ["inbox_url"]
list_select_related = True
actions = [redeliver_inbox_items]
actions = [redeliver_deliveries]
......@@ -16,7 +16,7 @@ class NestedLibraryFollowSerializer(serializers.ModelSerializer):
class LibrarySerializer(serializers.ModelSerializer):
actor = federation_serializers.APIActorSerializer()
files_count = serializers.SerializerMethodField()
uploads_count = serializers.SerializerMethodField()
follow = serializers.SerializerMethodField()
class Meta:
......@@ -28,13 +28,13 @@ class LibrarySerializer(serializers.ModelSerializer):
"name",
"description",
"creation_date",
"files_count",
"uploads_count",
"privacy_level",
"follow",
]
def get_files_count(self, o):
return max(getattr(o, "_files_count", 0), o.files_count)
def get_uploads_count(self, o):
return max(getattr(o, "_uploads_count", 0), o.uploads_count)
def get_follow(self, o):
try:
......
......@@ -87,7 +87,7 @@ class LibraryViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet):
music_models.Library.objects.all()
.order_by("-creation_date")
.select_related("actor")
.annotate(_files_count=Count("files"))
.annotate(_uploads_count=Count("uploads"))
)
serializer_class = api_serializers.LibrarySerializer
permission_classes = [permissions.IsAuthenticated]
......
......@@ -76,6 +76,9 @@ class ActorFactory(factory.DjangoModelFactory):
fid = factory.LazyAttribute(
lambda o: "https://{}/users/{}".format(o.domain, o.preferred_username)
)
followers_url = factory.LazyAttribute(
lambda o: "https://{}/users/{}followers".format(o.domain, o.preferred_username)
)
inbox_url = factory.LazyAttribute(
lambda o: "https://{}/users/{}/inbox".format(o.domain, o.preferred_username)
)
......@@ -134,19 +137,12 @@ class MusicLibraryFactory(factory.django.DjangoModelFactory):
privacy_level = "me"
name = factory.Faker("sentence")
description = factory.Faker("sentence")
files_count = 0
uploads_count = 0
fid = factory.Faker("federation_url")
class Meta:
model = "music.Library"
@factory.post_generation
def fid(self, create, extracted, **kwargs):
if not create:
# Simple build, do nothing.
return
self.fid = extracted or self.get_federation_id()
@factory.post_generation
def followers_url(self, create, extracted, **kwargs):
if not create:
......@@ -160,7 +156,7 @@ class MusicLibraryFactory(factory.django.DjangoModelFactory):
class LibraryScan(factory.django.DjangoModelFactory):
library = factory.SubFactory(MusicLibraryFactory)
actor = factory.SubFactory(ActorFactory)
total_files = factory.LazyAttribute(lambda o: o.library.files_count)
total_files = factory.LazyAttribute(lambda o: o.library.uploads_count)
class Meta:
model = "music.LibraryScan"
......@@ -169,7 +165,7 @@ class LibraryScan(factory.django.DjangoModelFactory):
@registry.register
class ActivityFactory(factory.django.DjangoModelFactory):
actor = factory.SubFactory(ActorFactory)
url = factory.Faker("url")
url = factory.Faker("federation_url")
payload = factory.LazyFunction(lambda: {"type": "Create"})
class Meta:
......@@ -178,7 +174,7 @@ class ActivityFactory(factory.django.DjangoModelFactory):
@registry.register
class InboxItemFactory(factory.django.DjangoModelFactory):
actor = factory.SubFactory(ActorFactory)
actor = factory.SubFactory(ActorFactory, local=True)
activity = factory.SubFactory(ActivityFactory)
type = "to"
......@@ -186,6 +182,15 @@ class InboxItemFactory(factory.django.DjangoModelFactory):
model = "federation.InboxItem"
@registry.register
class DeliveryFactory(factory.django.DjangoModelFactory):
activity = factory.SubFactory(ActivityFactory)
inbox_url = factory.Faker("url")
class Meta:
model = "federation.Delivery"
@registry.register
class LibraryFollowFactory(factory.DjangoModelFactory):
target = factory.SubFactory(MusicLibraryFactory)
......@@ -269,9 +274,9 @@ class AudioMetadataFactory(factory.Factory):
@registry.register(name="federation.Audio")
class AudioFactory(factory.Factory):
type = "Audio"
id = factory.Faker("url")
id = factory.Faker("federation_url")
published = factory.LazyFunction(lambda: timezone.now().isoformat())
actor = factory.Faker("url")
actor = factory.Faker("federation_url")
url = factory.SubFactory(LinkFactory, audio=True)
metadata = factory.SubFactory(LibraryTrackMetadataFactory)
......
......@@ -108,7 +108,7 @@ def get_library_page(library, page_url, actor):
)
serializer = serializers.CollectionPageSerializer(
data=response.json(),
context={"library": library, "item_serializer": serializers.AudioSerializer},
context={"library": library, "item_serializer": serializers.UploadSerializer},
)
serializer.is_valid(raise_exception=True)
return serializer.validated_data
# Generated by Django 2.0.8 on 2018-09-20 18:03
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('federation', '0011_auto_20180910_1902'),
]
operations = [
migrations.CreateModel(
name='Delivery',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('is_delivered', models.BooleanField(default=False)),
('last_attempt_date', models.DateTimeField(blank=True, null=True)),
('attempts', models.PositiveIntegerField(default=0)),
('inbox_url', models.URLField(max_length=500)),
('activity', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='deliveries', to='federation.Activity')),
],
),
migrations.RemoveField(
model_name='inboxitem',
name='delivery_attempts',
),
migrations.RemoveField(
model_name='inboxitem',
name='is_delivered',
),
migrations.RemoveField(
model_name='inboxitem',
name='last_delivery_date',
),
]
......@@ -48,8 +48,8 @@ class ActorQuerySet(models.QuerySet):
qs = qs.annotate(
**{
"_usage_{}".format(s): models.Sum(
"libraries__files__size",
filter=models.Q(libraries__files__import_status=s),
"libraries__uploads__size",
filter=models.Q(libraries__uploads__import_status=s),
)
}
)
......@@ -72,8 +72,8 @@ class Actor(models.Model):
domain = models.CharField(max_length=1000)
summary = models.CharField(max_length=500, null=True, blank=True)
preferred_username = models.CharField(max_length=200, null=True, blank=True)
public_key = models.CharField(max_length=5000, null=True, blank=True)
private_key = models.CharField(max_length=5000, null=True, blank=True)
public_key = models.TextField(max_length=5000, null=True, blank=True)
private_key = models.TextField(max_length=5000, null=True, blank=True)
creation_date = models.DateTimeField(default=timezone.now)
last_fetch_date = models.DateTimeField(default=timezone.now)
manually_approves_followers = models.NullBooleanField(default=None)
......@@ -159,25 +159,34 @@ class Actor(models.Model):
return data
class InboxItemQuerySet(models.QuerySet):
def local(self, include=True):
return self.exclude(actor__user__isnull=include)
class InboxItem(models.Model):
"""
Store activities binding to local actors, with read/unread status.
"""
actor = models.ForeignKey(
Actor, related_name="inbox_items", on_delete=models.CASCADE
)
activity = models.ForeignKey(
"Activity", related_name="inbox_items", on_delete=models.CASCADE
)
is_delivered = models.BooleanField(default=False)
type = models.CharField(max_length=10, choices=[("to", "to"), ("cc", "cc")])
last_delivery_date = models.DateTimeField(null=True, blank=True)
delivery_attempts = models.PositiveIntegerField(default=0)
is_read = models.BooleanField(default=False)
objects = InboxItemQuerySet.as_manager()
class Delivery(models.Model):
"""
Store deliveries attempt to remote inboxes
"""
is_delivered = models.BooleanField(default=False)
last_attempt_date = models.DateTimeField(null=True, blank=True)
attempts = models.PositiveIntegerField(default=0)
inbox_url = models.URLField(max_length=500)
activity = models.ForeignKey(
"Activity", related_name="deliveries", on_delete=models.CASCADE
)
class Activity(models.Model):
......
import logging
from funkwhale_api.music import models as music_models
from . import activity
from . import serializers
......@@ -90,3 +92,109 @@ def outbox_follow(context):
"object": follow.target,
"related_object": follow,
}
@outbox.register({"type": "Create", "object.type": "Audio"})
def outbox_create_audio(context):
upload = context["upload"]
serializer = serializers.ActivitySerializer(
{
"type": "Create",
"actor": upload.library.actor.fid,
"object": serializers.UploadSerializer(upload).data,
}
)
yield {
"type": "Create",
"actor": upload.library.actor,
"payload": with_recipients(
serializer.data, to=[{"type": "followers", "target": upload.library}]
),
"object": upload,
"target": upload.library,
}
@inbox.register({"type": "Create", "object.type": "Audio"})
def inbox_create_audio(payload, context):
serializer = serializers.UploadSerializer(
data=payload["object"],
context={"activity": context.get("activity"), "actor": context["actor"]},
)
if not serializer.is_valid(raise_exception=context.get("raise_exception", False)):
logger.warn("Discarding invalid audio create")
return
upload = serializer.save()
return {"object": upload, "target": upload.library}
@inbox.register({"type": "Delete", "object.type": "Library"})
def inbox_delete_library(payload, context):
actor = context["actor"]
library_id = payload["object"].get("id")
if not library_id:
logger.debug("Discarding deletion of empty library")
return
try:
library = actor.libraries.get(fid=library_id)
except music_models.Library.DoesNotExist:
logger.debug("Discarding deletion of unkwnown library %s", library_id)
return
library.delete()
@outbox.register({"type": "Delete", "object.type": "Library"})
def outbox_delete_library(context):
library = context["library"]
serializer = serializers.ActivitySerializer(
{"type": "Delete", "object": {"type": "Library", "id": library.fid}}
)
yield {
"type": "Delete",
"actor": library.actor,
"payload": with_recipients(
serializer.data, to=[{"type": "followers", "target": library}]
),
}
@inbox.register({"type": "Delete", "object.type": "Audio"})
def inbox_delete_audio(payload, context):
actor = context["actor"]
try:
upload_fids = [i for i in payload["object"]["id"]]
except TypeError:
# we did not receive a list of Ids, so we can probably use the value directly
upload_fids = [payload["object"]["id"]]
candidates = music_models.Upload.objects.filter(
library__actor=actor, fid__in=upload_fids
)
total = candidates.count()
logger.info("Deleting %s uploads with ids %s", total, upload_fids)
candidates.delete()
@outbox.register({"type": "Delete", "object.type": "Audio"})
def outbox_delete_audio(context):
uploads = context["uploads"]
library = uploads[0].library
serializer = serializers.ActivitySerializer(
{
"type": "Delete",
"object": {"type": "Audio", "id": [u.get_federation_id() for u in uploads]},
}
)
yield {
"type": "Delete",
"actor": library.actor,
"payload": with_recipients(
serializer.data, to=[{"type": "followers", "target": library}]
),
}
This diff is collapsed.
......@@ -27,7 +27,7 @@ def clean_music_cache():
limit = timezone.now() - datetime.timedelta(minutes=delay)
candidates = (
music_models.TrackFile.objects.filter(
music_models.Upload.objects.filter(
Q(audio_file__isnull=False)
& (Q(accessed_date__lt=limit) | Q(accessed_date=None))
)
......@@ -36,13 +36,13 @@ def clean_music_cache():
.only("audio_file", "id")
.order_by("id")
)
for tf in candidates:
tf.audio_file.delete()
for upload in candidates:
upload.audio_file.delete()
# we also delete orphaned files, if any
storage = models.LibraryTrack._meta.get_field("audio_file").storage
files = get_files(storage, "federation_cache/tracks")
existing = music_models.TrackFile.objects.filter(audio_file__in=files)
existing = music_models.Upload.objects.filter(audio_file__in=files)
missing = set(files) - set(existing.values_list("audio_file", flat=True))
for m in missing:
storage.delete(m)
......@@ -70,61 +70,30 @@ def dispatch_inbox(activity):
creation, etc.)
"""
try:
routes.inbox.dispatch(
activity.payload,
context={
"activity": activity,
"actor": activity.actor,
"inbox_items": (
activity.inbox_items.local()
.select_related()
.select_related("actor__user")
.prefetch_related("activity__object", "activity__target")
),
},
)
except Exception:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
)
raise
else:
activity.inbox_items.local().update(
delivery_attempts=F("delivery_attempts") + 1,
last_delivery_date=timezone.now(),
is_delivered=True,
)
routes.inbox.dispatch(
activity.payload,
context={
"activity": activity,
"actor": activity.actor,
"inbox_items": activity.inbox_items.filter(is_read=False),
},
)
@celery.app.task(name="federation.dispatch_outbox")
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def dispatch_outbox(activity):
"""
Deliver a local activity to its recipients
Deliver a local activity to its recipients, both locally and remotely
"""
inbox_items = activity.inbox_items.all().select_related("actor")
local_recipients_items = [ii for ii in inbox_items if ii.actor.is_local]
if local_recipients_items:
dispatch_inbox.delay(activity_id=activity.pk)
remote_recipients_items = [ii for ii in inbox_items if not ii.actor.is_local]
inbox_items = activity.inbox_items.filter(is_read=False).select_related()
deliveries = activity.deliveries.filter(is_delivered=False)
shared_inbox_urls = {
ii.actor.shared_inbox_url
for ii in remote_recipients_items
if ii.actor.shared_inbox_url
}
inbox_urls = {
ii.actor.inbox_url
for ii in remote_recipients_items
if not ii.actor.shared_inbox_url
}
for url in shared_inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, shared_inbox_url=url)
if inbox_items.exists():
dispatch_inbox.delay(activity_id=activity.pk)
for url in inbox_urls:
deliver_to_remote_inbox.delay(activity_id=activity.pk, inbox_url=url)
for id in deliveries.values_list("pk", flat=True):
deliver_to_remote.delay(delivery_id=id)
@celery.app.task(
......@@ -133,22 +102,21 @@ def dispatch_outbox(activity):
retry_backoff=30,
max_retries=5,
)
@celery.require_instance(models.Activity.objects.select_related(), "activity")
def deliver_to_remote_inbox(activity, inbox_url=None, shared_inbox_url=None):
url = inbox_url or shared_inbox_url
actor = activity.actor
inbox_items = activity.inbox_items.filter(is_delivered=False)
if inbox_url:
inbox_items = inbox_items.filter(actor__inbox_url=inbox_url)
else:
inbox_items = inbox_items.filter(actor__shared_inbox_url=shared_inbox_url)
logger.info("Preparing activity delivery to %s", url)
@celery.require_instance(
models.Delivery.objects.filter(is_delivered=False).select_related(
"activity__actor"
),
"delivery",
)
def deliver_to_remote(delivery):
actor = delivery.activity.actor
logger.info("Preparing activity delivery to %s", delivery.inbox_url)
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try: