Verified Commit 74351673 authored by Eliot Berriot's avatar Eliot Berriot
Browse files

See #170: fetch channel outbox on discovery/detail

parent 0eeead34
......@@ -146,6 +146,8 @@ FEDERATION_ACTOR_FETCH_DELAY = env.int("FEDERATION_ACTOR_FETCH_DELAY", default=6
FEDERATION_SERVICE_ACTOR_USERNAME = env(
"FEDERATION_SERVICE_ACTOR_USERNAME", default="service"
)
# How many pages to fetch when crawling outboxes and third-party collections
FEDERATION_COLLECTION_MAX_PAGES = env.int("FEDERATION_COLLECTION_MAX_PAGES", default=5)
ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=[]) + [FUNKWHALE_HOSTNAME]
# APP CONFIGURATION
......
import uuid
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.postgres.fields import JSONField
from django.core.serializers.json import DjangoJSONEncoder
from django.db import models
......@@ -67,6 +68,11 @@ class Channel(models.Model):
default=empty_dict, max_length=50000, encoder=DjangoJSONEncoder, blank=True
)
fetches = GenericRelation(
"federation.Fetch",
content_type_field="object_content_type",
object_id_field="object_id",
)
objects = ChannelQuerySet.as_manager()
@property
......@@ -74,6 +80,10 @@ class Channel(models.Model):
if not self.is_external_rss:
return self.actor.fid
@property
def is_local(self):
return self.actor.is_local
@property
def is_external_rss(self):
return self.actor.preferred_username.startswith("rssfeed-")
......
......@@ -110,6 +110,15 @@ class ChannelViewSet(
else:
return super().list(request, *args, **kwargs)
def get_object(self):
obj = super().get_object()
if (
self.action == "retrieve"
and self.request.GET.get("refresh", "").lower() == "true"
):
obj = music_views.refetch_obj(obj, self.get_queryset())
return obj
@decorators.action(
detail=True,
methods=["post"],
......
......@@ -19,7 +19,7 @@ from funkwhale_api.music import models as music_models
from funkwhale_api.music import tasks as music_tasks
from funkwhale_api.tags import models as tags_models
from . import activity, actors, contexts, jsonld, models, tasks, utils
from . import activity, actors, contexts, jsonld, models, utils
logger = logging.getLogger(__name__)
......@@ -391,6 +391,8 @@ class ActorSerializer(jsonld.JsonLdSerializer):
domain = urllib.parse.urlparse(kwargs["fid"]).netloc
domain, domain_created = models.Domain.objects.get_or_create(pk=domain)
if domain_created and not domain.is_local:
from . import tasks
# first time we see the domain, we trigger nodeinfo fetching
tasks.update_domain_nodeinfo(domain_name=domain.name)
......@@ -896,8 +898,6 @@ def get_additional_fields(data):
PAGINATED_COLLECTION_JSONLD_MAPPING = {
"totalItems": jsonld.first_val(contexts.AS.totalItems),
"actor": jsonld.first_id(contexts.AS.actor),
"attributedTo": jsonld.first_id(contexts.AS.attributedTo),
"first": jsonld.first_id(contexts.AS.first),
"last": jsonld.first_id(contexts.AS.last),
"partOf": jsonld.first_id(contexts.AS.partOf),
......@@ -905,10 +905,10 @@ PAGINATED_COLLECTION_JSONLD_MAPPING = {
class PaginatedCollectionSerializer(jsonld.JsonLdSerializer):
type = serializers.ChoiceField(choices=[contexts.AS.Collection])
type = serializers.ChoiceField(
choices=[contexts.AS.Collection, contexts.AS.OrderedCollection]
)
totalItems = serializers.IntegerField(min_value=0)
actor = serializers.URLField(max_length=500, required=False)
attributedTo = serializers.URLField(max_length=500, required=False)
id = serializers.URLField(max_length=500)
first = serializers.URLField(max_length=500)
last = serializers.URLField(max_length=500)
......@@ -916,18 +916,6 @@ class PaginatedCollectionSerializer(jsonld.JsonLdSerializer):
class Meta:
jsonld_mapping = PAGINATED_COLLECTION_JSONLD_MAPPING
def validate(self, validated_data):
d = super().validate(validated_data)
actor = d.get("actor")
attributed_to = d.get("attributedTo")
if not actor and not attributed_to:
raise serializers.ValidationError(
"You need to provide at least actor or attributedTo"
)
d["attributedTo"] = attributed_to or actor
return d
def to_representation(self, conf):
paginator = Paginator(conf["items"], conf.get("page_size", 20))
first = common_utils.set_query_parameter(conf["id"], page=1)
......@@ -954,6 +942,8 @@ class LibrarySerializer(PaginatedCollectionSerializer):
type = serializers.ChoiceField(
choices=[contexts.AS.Collection, contexts.FW.Library]
)
actor = serializers.URLField(max_length=500, required=False)
attributedTo = serializers.URLField(max_length=500, required=False)
name = serializers.CharField()
summary = serializers.CharField(allow_blank=True, allow_null=True, required=False)
followers = serializers.URLField(max_length=500)
......@@ -976,9 +966,23 @@ class LibrarySerializer(PaginatedCollectionSerializer):
"summary": jsonld.first_val(contexts.AS.summary),
"audience": jsonld.first_id(contexts.AS.audience),
"followers": jsonld.first_id(contexts.AS.followers),
"actor": jsonld.first_id(contexts.AS.actor),
"attributedTo": jsonld.first_id(contexts.AS.attributedTo),
},
)
def validate(self, validated_data):
d = super().validate(validated_data)
actor = d.get("actor")
attributed_to = d.get("attributedTo")
if not actor and not attributed_to:
raise serializers.ValidationError(
"You need to provide at least actor or attributedTo"
)
d["attributedTo"] = attributed_to or actor
return d
def to_representation(self, library):
conf = {
"id": library.fid,
......@@ -1934,7 +1938,15 @@ class ChannelUploadSerializer(jsonld.JsonLdSerializer):
return self.update_or_create(validated_data)
class ChannelCreateUploadSerializer(serializers.Serializer):
class ChannelCreateUploadSerializer(jsonld.JsonLdSerializer):
type = serializers.ChoiceField(choices=[contexts.AS.Create])
object = serializers.DictField()
class Meta:
jsonld_mapping = {
"object": jsonld.first_obj(contexts.AS.object),
}
def to_representation(self, upload):
return {
"@context": jsonld.get_default_context(),
......@@ -1944,3 +1956,13 @@ class ChannelCreateUploadSerializer(serializers.Serializer):
upload, context={"include_ap_context": False}
).data,
}
def validate(self, validated_data):
serializer = ChannelUploadSerializer(
data=validated_data["object"], context=self.context, jsonld_expand=False
)
serializer.is_valid(raise_exception=True)
return {"audio_serializer": serializer}
def save(self, **kwargs):
return self.validated_data["audio_serializer"].save(**kwargs)
......@@ -21,6 +21,7 @@ from funkwhale_api.moderation import mrf
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
from . import activity
from . import actors
from . import jsonld
from . import keys
......@@ -399,8 +400,24 @@ def fetch(fetch_obj):
# special case for channels
# when obj is an actor, we check if the actor has a channel associated with it
# if it is the case, we consider the fetch obj to be a channel instead
# and also trigger a fetch on the channel outbox
if isinstance(obj, models.Actor) and obj.get_channel():
obj = obj.get_channel()
if obj.actor.outbox_url:
# first page fetch is synchronous, so that at least some data is available
# in the UI after subscription
result = fetch_collection(
obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
)
if result.get("next_page"):
# additional pages are fetched in the background
result = fetch_collection.delay(
result["next_page"],
channel_id=obj.pk,
max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
is_page=True,
)
fetch_obj.object = obj
fetch_obj.status = "finished"
fetch_obj.fetch_date = timezone.now()
......@@ -469,3 +486,106 @@ def remove_actor(actor):
actor.name = None
actor.summary = None
actor.save(update_fields=["type", "name", "summary"])
COLLECTION_ACTIVITY_SERIALIZERS = [
(
{"type": "Create", "object.type": "Audio"},
serializers.ChannelCreateUploadSerializer,
)
]
def match_serializer(payload, conf):
return [
serializer_class
for route, serializer_class in conf
if activity.match_route(route, payload)
]
@celery.app.task(name="federation.fetch_collection")
@celery.require_instance(
audio_models.Channel.objects.all(), "channel", allow_null=True,
)
def fetch_collection(url, max_pages, channel, is_page=False):
actor = actors.get_service_actor()
results = {
"items": [],
"skipped": 0,
"errored": 0,
"seen": 0,
"total": 0,
}
if is_page:
# starting immediatly from a page, no need to fetch the wrapping collection
logger.debug("Fetch collection page immediatly at %s", url)
results["next_page"] = url
else:
logger.debug("Fetching collection object at %s", url)
collection = utils.retrieve_ap_object(
url,
actor=actor,
serializer_class=serializers.PaginatedCollectionSerializer,
)
results["next_page"] = collection["first"]
results["total"] = collection.get("totalItems")
seen_pages = 0
context = {}
if channel:
context["channel"] = channel
for i in range(max_pages):
page_url = results["next_page"]
logger.debug("Handling page %s on max %s, at %s", i + 1, max_pages, page_url)
page = utils.retrieve_ap_object(page_url, actor=actor, serializer_class=None,)
try:
items = page["orderedItems"]
except KeyError:
try:
items = page["items"]
except KeyError:
logger.error("Invalid collection page at %s", page_url)
break
for item in items:
results["seen"] += 1
matching_serializer = match_serializer(
item, COLLECTION_ACTIVITY_SERIALIZERS
)
if not matching_serializer:
results["skipped"] += 1
logger.debug("Skipping unhandled activity %s", item.get("type"))
continue
s = matching_serializer[0](data=item, context=context)
if not s.is_valid():
logger.warn("Skipping invalid activity: %s", s.errors)
results["errored"] += 1
continue
results["items"].append(s.save())
seen_pages += 1
results["next_page"] = page.get("next", None) or None
if not results["next_page"]:
logger.debug("No more pages to fetch")
break
logger.info(
"Finished fetch of collection pages at %s. Results:\n"
" Total in collection: %s\n"
" Seen: %s\n"
" Handled: %s\n"
" Skipped: %s\n"
" Errored: %s",
url,
results.get("total"),
results["seen"],
len(results["items"]),
results["skipped"],
results["errored"],
)
return results
......@@ -107,7 +107,10 @@ def retrieve_ap_object(
return data
serializer = serializer_class(data=data, context={"fetch_actor": actor})
serializer.is_valid(raise_exception=True)
return serializer.save()
try:
return serializer.save()
except NotImplementedError:
return serializer.validated_data
def get_domain_query_from_url(domain, url_field="fid"):
......
......@@ -37,16 +37,22 @@ class CeleryConfig(AppConfig):
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)
def require_instance(model_or_qs, parameter_name, id_kwarg_name=None):
def require_instance(model_or_qs, parameter_name, id_kwarg_name=None, allow_null=False):
def decorator(function):
@functools.wraps(function)
def inner(*args, **kwargs):
kw = id_kwarg_name or "_".join([parameter_name, "id"])
pk = kwargs.pop(kw)
try:
instance = model_or_qs.get(pk=pk)
except AttributeError:
instance = model_or_qs.objects.get(pk=pk)
pk = kwargs.pop(kw)
except KeyError:
if not allow_null:
raise
instance = None
else:
try:
instance = model_or_qs.get(pk=pk)
except AttributeError:
instance = model_or_qs.objects.get(pk=pk)
kwargs[parameter_name] = instance
return function(*args, **kwargs)
......
......@@ -420,3 +420,18 @@ def test_subscribe_to_rss_creates_channel(factories, logged_in_api_client, mocke
assert response.data["channel"]["uuid"] == channel.uuid
get_channel_from_rss_url.assert_called_once_with(rss_url)
def test_refresh_channel_when_param_is_true(
factories, mocker, logged_in_api_client, queryset_equal_queries,
):
obj = factories["audio.Channel"]()
refetch_obj = mocker.patch(
"funkwhale_api.music.views.refetch_obj", return_value=obj
)
url = reverse("api:v1:channels-detail", kwargs={"composite": obj.uuid})
response = logged_in_api_client.get(url, {"refresh": "true"})
assert response.status_code == 200
assert refetch_obj.call_count == 1
assert refetch_obj.call_args[0][0] == obj
......@@ -418,7 +418,6 @@ def test_paginated_collection_serializer_validation():
assert serializer.is_valid(raise_exception=True) is True
assert serializer.validated_data["totalItems"] == 5
assert serializer.validated_data["id"] == data["id"]
assert serializer.validated_data["attributedTo"] == data["actor"]
def test_collection_page_serializer_validation():
......
......@@ -491,10 +491,16 @@ def test_fetch_url(factory_name, serializer_class, factories, r_mock, mocker):
assert save.call_count == 1
def test_fetch_channel_actor_returns_channel(factories, r_mock):
def test_fetch_channel_actor_returns_channel_and_fetch_outbox(
factories, r_mock, settings, mocker
):
obj = factories["audio.Channel"]()
fetch = factories["federation.Fetch"](url=obj.actor.fid)
payload = serializers.ActorSerializer(obj.actor).data
fetch_collection = mocker.patch.object(
tasks, "fetch_collection", return_value={"next_page": "http://outbox.url/page2"}
)
fetch_collection_delayed = mocker.patch.object(tasks.fetch_collection, "delay")
r_mock.get(obj.fid, json=payload)
......@@ -504,6 +510,15 @@ def test_fetch_channel_actor_returns_channel(factories, r_mock):
assert fetch.status == "finished"
assert fetch.object == obj
fetch_collection.assert_called_once_with(
obj.actor.outbox_url, channel_id=obj.pk, max_pages=1,
)
fetch_collection_delayed.assert_called_once_with(
"http://outbox.url/page2",
max_pages=settings.FEDERATION_COLLECTION_MAX_PAGES - 1,
is_page=True,
channel_id=obj.pk,
)
def test_fetch_honor_instance_policy_domain(factories):
......@@ -563,3 +578,71 @@ def test_fetch_honor_instance_policy_different_url_and_id(r_mock, factories):
assert fetch.status == "errored"
assert fetch.detail["error_code"] == "blocked"
def test_fetch_collection(mocker, r_mock):
class DummySerializer(serializers.serializers.Serializer):
def validate(self, validated_data):
validated_data = self.initial_data
if "id" not in validated_data["object"]:
raise serializers.serializers.ValidationError()
return validated_data
def save(self):
return self.initial_data
mocker.patch.object(
tasks,
"COLLECTION_ACTIVITY_SERIALIZERS",
[({"type": "Create", "object.type": "Audio"}, DummySerializer)],
)
payloads = {
"outbox": {
"id": "https://actor.url/outbox",
"@context": jsonld.get_default_context(),
"type": "OrderedCollection",
"totalItems": 27094,
"first": "https://actor.url/outbox?page=1",
"last": "https://actor.url/outbox?page=3",
},
"page1": {
"@context": jsonld.get_default_context(),
"type": "OrderedCollectionPage",
"next": "https://actor.url/outbox?page=2",
"orderedItems": [
{"type": "Unhandled"},
{"type": "Unhandled"},
{
"type": "Create",
"object": {"type": "Audio", "id": "https://actor.url/audio1"},
},
],
},
"page2": {
"@context": jsonld.get_default_context(),
"type": "OrderedCollectionPage",
"next": "https://actor.url/outbox?page=3",
"orderedItems": [
{"type": "Unhandled"},
{
"type": "Create",
"object": {"type": "Audio", "id": "https://actor.url/audio2"},
},
{"type": "Unhandled"},
{"type": "Create", "object": {"type": "Audio"}},
],
},
}
r_mock.get(payloads["outbox"]["id"], json=payloads["outbox"])
r_mock.get(payloads["outbox"]["first"], json=payloads["page1"])
r_mock.get(payloads["page1"]["next"], json=payloads["page2"])
result = tasks.fetch_collection(payloads["outbox"]["id"], max_pages=2,)
assert result["items"] == [
payloads["page1"]["orderedItems"][2],
payloads["page2"]["orderedItems"][1],
]
assert result["skipped"] == 4
assert result["errored"] == 1
assert result["seen"] == 7
assert result["total"] == 27094
assert result["next_page"] == payloads["page2"]["next"]
......@@ -272,7 +272,7 @@ export default {
this.showEditModal = false
this.edit.isLoading = false
this.isLoading = true
let channelPromise = axios.get(`channels/${this.id}`).then(response => {
let channelPromise = axios.get(`channels/${this.id}`, {params: {refresh: 'true'}}).then(response => {
self.object = response.data
if ((self.id == response.data.uuid) && response.data.actor) {
// replace with the pretty channel url if possible
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment