Commit 44a5034d authored by Agate's avatar Agate 💬

Merge branch '170-fetch-api' into 'develop'

See #170: fetching remote objects

See merge request !1043
parents 65097f62 c2eeee5e
Pipeline #9803 canceled with stages
in 3 seconds
......@@ -833,6 +833,10 @@ THROTTLING_RATES = {
"rate": THROTTLING_USER_RATES.get("password-reset-confirm", "20/h"),
"description": "Password reset confirmation",
},
"fetch": {
"rate": THROTTLING_USER_RATES.get("fetch", "200/d"),
"description": "Fetch remote objects",
},
}
......@@ -906,7 +910,7 @@ ACCOUNT_USERNAME_BLACKLIST = [
] + env.list("ACCOUNT_USERNAME_BLACKLIST", default=[])
EXTERNAL_REQUESTS_VERIFY_SSL = env.bool("EXTERNAL_REQUESTS_VERIFY_SSL", default=True)
EXTERNAL_REQUESTS_TIMEOUT = env.int("EXTERNAL_REQUESTS_TIMEOUT", default=5)
EXTERNAL_REQUESTS_TIMEOUT = env.int("EXTERNAL_REQUESTS_TIMEOUT", default=10)
# XXX: deprecated, see #186
API_AUTHENTICATION_REQUIRED = env.bool("API_AUTHENTICATION_REQUIRED", True)
......@@ -955,7 +959,11 @@ FEDERATION_OBJECT_FETCH_DELAY = env.int(
MODERATION_EMAIL_NOTIFICATIONS_ENABLED = env.bool(
"MODERATION_EMAIL_NOTIFICATIONS_ENABLED", default=True
)
FEDERATION_AUTHENTIFY_FETCHES = True
FEDERATION_SYNCHRONOUS_FETCH = env.bool("FEDERATION_SYNCHRONOUS_FETCH", default=True)
FEDERATION_DUPLICATE_FETCH_DELAY = env.int(
"FEDERATION_DUPLICATE_FETCH_DELAY", default=60 * 50
)
# Delay in days after signup before we show the "support us" messages
INSTANCE_SUPPORT_MESSAGE_DELAY = env.int("INSTANCE_SUPPORT_MESSAGE_DELAY", default=15)
FUNKWHALE_SUPPORT_MESSAGE_DELAY = env.int("FUNKWHALE_SUPPORT_MESSAGE_DELAY", default=15)
......
......@@ -234,9 +234,11 @@ def get_updated_fields(conf, data, obj):
data_value = data[data_field]
except KeyError:
continue
obj_value = getattr(obj, obj_field)
if obj_value != data_value:
if obj.pk:
obj_value = getattr(obj, obj_field)
if obj_value != data_value:
final_data[obj_field] = data_value
else:
final_data[obj_field] = data_value
return final_data
......
import datetime
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core import validators
from django.utils import timezone
from rest_framework import serializers
from funkwhale_api.common import fields as common_fields
from funkwhale_api.common import serializers as common_serializers
from funkwhale_api.music import models as music_models
from funkwhale_api.users import serializers as users_serializers
......@@ -158,8 +164,21 @@ class InboxItemActionSerializer(common_serializers.ActionSerializer):
return objects.update(is_read=True)
FETCH_OBJECT_CONFIG = {
"artist": {"queryset": music_models.Artist.objects.all()},
"album": {"queryset": music_models.Album.objects.all()},
"track": {"queryset": music_models.Track.objects.all()},
"library": {"queryset": music_models.Library.objects.all(), "id_attr": "uuid"},
"upload": {"queryset": music_models.Upload.objects.all(), "id_attr": "uuid"},
"account": {"queryset": models.Actor.objects.all(), "id_attr": "full_username"},
}
FETCH_OBJECT_FIELD = common_fields.GenericRelation(FETCH_OBJECT_CONFIG)
class FetchSerializer(serializers.ModelSerializer):
actor = federation_serializers.APIActorSerializer()
actor = federation_serializers.APIActorSerializer(read_only=True)
object = serializers.CharField(write_only=True)
force = serializers.BooleanField(default=False, required=False, write_only=True)
class Meta:
model = models.Fetch
......@@ -171,7 +190,63 @@ class FetchSerializer(serializers.ModelSerializer):
"detail",
"creation_date",
"fetch_date",
"object",
"force",
]
read_only_fields = [
"id",
"url",
"actor",
"status",
"detail",
"creation_date",
"fetch_date",
]
def validate_object(self, value):
# if value is a webginfer lookup, we craft a special url
if value.startswith("@"):
value = value.lstrip("@")
validator = validators.EmailValidator()
try:
validator(value)
except validators.ValidationError:
return value
return "webfinger://{}".format(value)
def create(self, validated_data):
check_duplicates = not validated_data.get("force", False)
if check_duplicates:
# first we check for duplicates
duplicate = (
validated_data["actor"]
.fetches.filter(
status="finished",
url=validated_data["object"],
creation_date__gte=timezone.now()
- datetime.timedelta(
seconds=settings.FEDERATION_DUPLICATE_FETCH_DELAY
),
)
.order_by("-creation_date")
.first()
)
if duplicate:
return duplicate
fetch = models.Fetch.objects.create(
actor=validated_data["actor"], url=validated_data["object"]
)
return fetch
def to_representation(self, obj):
repr = super().to_representation(obj)
object_data = None
if obj.object:
object_data = FETCH_OBJECT_FIELD.to_representation(obj.object)
repr["object"] = object_data
return repr
class FullActorSerializer(serializers.Serializer):
......
import requests.exceptions
from django.conf import settings
from django.db import transaction
from django.db.models import Count
......@@ -10,6 +11,7 @@ from rest_framework import response
from rest_framework import viewsets
from funkwhale_api.common import preferences
from funkwhale_api.common import utils as common_utils
from funkwhale_api.common.permissions import ConditionalAuthentication
from funkwhale_api.music import models as music_models
from funkwhale_api.music import views as music_views
......@@ -22,6 +24,7 @@ from . import filters
from . import models
from . import routes
from . import serializers
from . import tasks
from . import utils
......@@ -195,11 +198,28 @@ class InboxItemViewSet(
return response.Response(result, status=200)
class FetchViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet):
class FetchViewSet(
mixins.CreateModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet
):
queryset = models.Fetch.objects.select_related("actor")
serializer_class = api_serializers.FetchSerializer
permission_classes = [permissions.IsAuthenticated]
throttling_scopes = {"create": {"authenticated": "fetch"}}
def get_queryset(self):
return super().get_queryset().filter(actor=self.request.user.actor)
def perform_create(self, serializer):
fetch = serializer.save(actor=self.request.user.actor)
if fetch.status == "finished":
# a duplicate was returned, no need to fetch again
return
if settings.FEDERATION_SYNCHRONOUS_FETCH:
tasks.fetch(fetch_id=fetch.pk)
fetch.refresh_from_db()
else:
common_utils.on_commit(tasks.fetch.delay, fetch_id=fetch.pk)
class DomainViewSet(
......
......@@ -21,7 +21,7 @@ class SignatureAuthFactory(factory.Factory):
key = factory.LazyFunction(lambda: keys.get_key_pair()[0])
key_id = factory.Faker("url")
use_auth_header = False
headers = ["(request-target)", "user-agent", "host", "date", "content-type"]
headers = ["(request-target)", "user-agent", "host", "date", "accept"]
class Meta:
model = requests_http_signature.HTTPSignatureAuth
......@@ -42,7 +42,7 @@ class SignedRequestFactory(factory.Factory):
"User-Agent": "Test",
"Host": "test.host",
"Date": http_date(timezone.now().timestamp()),
"Content-Type": "application/activity+json",
"Accept": "application/activity+json",
}
if extracted:
default_headers.update(extracted)
......
......@@ -9,9 +9,7 @@ def get_library_data(library_url, actor):
auth = signing.get_auth(actor.private_key, actor.private_key_id)
try:
response = session.get_session().get(
library_url,
auth=auth,
headers={"Content-Type": "application/activity+json"},
library_url, auth=auth, headers={"Accept": "application/activity+json"},
)
except requests.ConnectionError:
return {"errors": ["This library is not reachable"]}
......@@ -32,7 +30,7 @@ def get_library_data(library_url, actor):
def get_library_page(library, page_url, actor):
auth = signing.get_auth(actor.private_key, actor.private_key_id)
response = session.get_session().get(
page_url, auth=auth, headers={"Content-Type": "application/activity+json"},
page_url, auth=auth, headers={"Accept": "application/activity+json"},
)
serializer = serializers.CollectionPageSerializer(
data=response.json(),
......
......@@ -372,7 +372,7 @@ class Fetch(models.Model):
objects = FetchQuerySet.as_manager()
def save(self, **kwargs):
if not self.url and self.object:
if not self.url and self.object and hasattr(self.object, "fid"):
self.url = self.object.fid
super().save(**kwargs)
......@@ -388,6 +388,11 @@ class Fetch(models.Model):
contexts.FW.Track: serializers.TrackSerializer,
contexts.AS.Audio: serializers.UploadSerializer,
contexts.FW.Library: serializers.LibrarySerializer,
contexts.AS.Group: serializers.ActorSerializer,
contexts.AS.Person: serializers.ActorSerializer,
contexts.AS.Organization: serializers.ActorSerializer,
contexts.AS.Service: serializers.ActorSerializer,
contexts.AS.Application: serializers.ActorSerializer,
}
......@@ -568,7 +573,7 @@ class LibraryTrack(models.Model):
auth=auth,
stream=True,
timeout=20,
headers={"Content-Type": "application/activity+json"},
headers={"Accept": "application/activity+json"},
)
with remote_response as r:
remote_response.raise_for_status()
......
......@@ -151,6 +151,10 @@ class ActorSerializer(jsonld.JsonLdSerializer):
)
class Meta:
# not strictly necessary because it's not a model serializer
# but used by tasks.py/fetch
model = models.Actor
jsonld_mapping = {
"outbox": jsonld.first_id(contexts.AS.outbox),
"inbox": jsonld.first_id(contexts.LDP.inbox),
......@@ -765,6 +769,10 @@ class LibrarySerializer(PaginatedCollectionSerializer):
)
class Meta:
# not strictly necessary because it's not a model serializer
# but used by tasks.py/fetch
model = music_models.Library
jsonld_mapping = common_utils.concat_dicts(
PAGINATED_COLLECTION_JSONLD_MAPPING,
{
......@@ -795,12 +803,15 @@ class LibrarySerializer(PaginatedCollectionSerializer):
return r
def create(self, validated_data):
actor = utils.retrieve_ap_object(
validated_data["attributedTo"],
actor=self.context.get("fetch_actor"),
queryset=models.Actor,
serializer_class=ActorSerializer,
)
if self.instance:
actor = self.instance.actor
else:
actor = utils.retrieve_ap_object(
validated_data["attributedTo"],
actor=self.context.get("fetch_actor"),
queryset=models.Actor,
serializer_class=ActorSerializer,
)
privacy = {"": "me", "./": "me", None: "me", contexts.AS.Public: "everyone"}
library, created = music_models.Library.objects.update_or_create(
fid=validated_data["id"],
......@@ -815,6 +826,9 @@ class LibrarySerializer(PaginatedCollectionSerializer):
)
return library
def update(self, instance, validated_data):
return self.create(validated_data)
class CollectionPageSerializer(jsonld.JsonLdSerializer):
type = serializers.ChoiceField(choices=[contexts.AS.CollectionPage])
......@@ -968,8 +982,13 @@ class MusicEntitySerializer(jsonld.JsonLdSerializer):
allow_null=True,
)
@transaction.atomic
def update(self, instance, validated_data):
return self.update_or_create(validated_data)
@transaction.atomic
def update_or_create(self, validated_data):
instance = self.instance or self.Meta.model(fid=validated_data["id"])
creating = instance.pk is None
attributed_to_fid = validated_data.get("attributedTo")
if attributed_to_fid:
validated_data["attributedTo"] = actors.get_actor(attributed_to_fid)
......@@ -977,8 +996,11 @@ class MusicEntitySerializer(jsonld.JsonLdSerializer):
self.updateable_fields, validated_data, instance
)
updated_fields = self.validate_updated_data(instance, updated_fields)
if updated_fields:
if creating:
instance, created = self.Meta.model.objects.get_or_create(
fid=validated_data["id"], defaults=updated_fields
)
else:
music_tasks.update_library_entity(instance, updated_fields)
tags = [t["name"] for t in validated_data.get("tags", []) or []]
......@@ -1064,6 +1086,8 @@ class ArtistSerializer(MusicEntitySerializer):
d["@context"] = jsonld.get_default_context()
return d
create = MusicEntitySerializer.update_or_create
class AlbumSerializer(MusicEntitySerializer):
released = serializers.DateField(allow_null=True, required=False)
......@@ -1074,10 +1098,11 @@ class AlbumSerializer(MusicEntitySerializer):
)
updateable_fields = [
("name", "title"),
("cover", "attachment_cover"),
("musicbrainzId", "mbid"),
("attributedTo", "attributed_to"),
("released", "release_date"),
("cover", "attachment_cover"),
("_artist", "artist"),
]
class Meta:
......@@ -1124,6 +1149,20 @@ class AlbumSerializer(MusicEntitySerializer):
d["@context"] = jsonld.get_default_context()
return d
def validate(self, data):
validated_data = super().validate(data)
if not self.parent:
validated_data["_artist"] = utils.retrieve_ap_object(
validated_data["artists"][0]["id"],
actor=self.context.get("fetch_actor"),
queryset=music_models.Artist,
serializer_class=ArtistSerializer,
)
return validated_data
create = MusicEntitySerializer.update_or_create
class TrackSerializer(MusicEntitySerializer):
position = serializers.IntegerField(min_value=0, allow_null=True, required=False)
......@@ -1293,39 +1332,66 @@ class UploadSerializer(jsonld.JsonLdSerializer):
return lb
actor = self.context.get("actor")
kwargs = {}
if actor:
kwargs["actor"] = actor
try:
return music_models.Library.objects.get(fid=v, **kwargs)
except music_models.Library.DoesNotExist:
library = utils.retrieve_ap_object(
v,
actor=self.context.get("fetch_actor"),
queryset=music_models.Library,
serializer_class=LibrarySerializer,
)
except Exception:
raise serializers.ValidationError("Invalid library")
if actor and library.actor != actor:
raise serializers.ValidationError("Invalid library")
return library
def create(self, validated_data):
try:
return music_models.Upload.objects.get(fid=validated_data["id"])
except music_models.Upload.DoesNotExist:
pass
def update(self, instance, validated_data):
return self.create(validated_data)
track = TrackSerializer(
context={"activity": self.context.get("activity")}
).create(validated_data["track"])
@transaction.atomic
def create(self, validated_data):
instance = self.instance or None
if not self.instance:
try:
instance = music_models.Upload.objects.get(fid=validated_data["id"])
except music_models.Upload.DoesNotExist:
pass
data = {
"fid": validated_data["id"],
"mimetype": validated_data["url"]["mediaType"],
"source": validated_data["url"]["href"],
"creation_date": validated_data["published"],
"modification_date": validated_data.get("updated"),
"track": track,
"duration": validated_data["duration"],
"size": validated_data["size"],
"bitrate": validated_data["bitrate"],
"library": validated_data["library"],
"from_activity": self.context.get("activity"),
"import_status": "finished",
}
return music_models.Upload.objects.create(**data)
if instance:
data = {
"mimetype": validated_data["url"]["mediaType"],
"source": validated_data["url"]["href"],
"creation_date": validated_data["published"],
"modification_date": validated_data.get("updated"),
"duration": validated_data["duration"],
"size": validated_data["size"],
"bitrate": validated_data["bitrate"],
"import_status": "finished",
}
return music_models.Upload.objects.update_or_create(
fid=validated_data["id"], defaults=data
)[0]
else:
track = TrackSerializer(
context={"activity": self.context.get("activity")}
).create(validated_data["track"])
data = {
"fid": validated_data["id"],
"mimetype": validated_data["url"]["mediaType"],
"source": validated_data["url"]["href"],
"creation_date": validated_data["published"],
"modification_date": validated_data.get("updated"),
"track": track,
"duration": validated_data["duration"],
"size": validated_data["size"],
"bitrate": validated_data["bitrate"],
"library": validated_data["library"],
"from_activity": self.context.get("activity"),
"import_status": "finished",
}
return music_models.Upload.objects.create(**data)
def to_representation(self, instance):
track = instance.track
......
......@@ -14,6 +14,7 @@ from requests.exceptions import RequestException
from funkwhale_api.common import preferences
from funkwhale_api.common import session
from funkwhale_api.common import utils as common_utils
from funkwhale_api.moderation import mrf
from funkwhale_api.music import models as music_models
from funkwhale_api.taskapp import celery
......@@ -24,6 +25,7 @@ from . import models, signing
from . import serializers
from . import routes
from . import utils
from . import webfinger
logger = logging.getLogger(__name__)
......@@ -285,24 +287,45 @@ def rotate_actor_key(actor):
@celery.app.task(name="federation.fetch")
@transaction.atomic
@celery.require_instance(
models.Fetch.objects.filter(status="pending").select_related("actor"), "fetch"
models.Fetch.objects.filter(status="pending").select_related("actor"),
"fetch_obj",
"fetch_id",
)
def fetch(fetch):
actor = fetch.actor
auth = signing.get_auth(actor.private_key, actor.private_key_id)
def fetch(fetch_obj):
def error(code, **kwargs):
fetch.status = "errored"
fetch.fetch_date = timezone.now()
fetch.detail = {"error_code": code}
fetch.detail.update(kwargs)
fetch.save(update_fields=["fetch_date", "status", "detail"])
fetch_obj.status = "errored"
fetch_obj.fetch_date = timezone.now()
fetch_obj.detail = {"error_code": code}
fetch_obj.detail.update(kwargs)
fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
url = fetch_obj.url
mrf_check_url = url
if not mrf_check_url.startswith("webfinger://"):
payload, updated = mrf.inbox.apply({"id": mrf_check_url})
if not payload:
return error("blocked", message="Blocked by MRF")
actor = fetch_obj.actor
if settings.FEDERATION_AUTHENTIFY_FETCHES:
auth = signing.get_auth(actor.private_key, actor.private_key_id)
else:
auth = None
try:
if url.startswith("webfinger://"):
# we first grab the correpsonding webfinger representation
# to get the ActivityPub actor ID
webfinger_data = webfinger.get_resource(
"acct:" + url.replace("webfinger://", "")
)
url = webfinger.get_ap_url(webfinger_data["links"])
if not url:
return error("webfinger", message="Invalid or missing webfinger data")
payload, updated = mrf.inbox.apply({"id": url})
if not payload:
return error("blocked", message="Blocked by MRF")
response = session.get_session().get(
auth=auth,
url=fetch.url,
headers={"Content-Type": "application/activity+json"},
auth=auth, url=url, headers={"Accept": "application/activity+json"},
)
logger.debug("Remote answered with %s", response.status_code)
response.raise_for_status()
......@@ -320,8 +343,19 @@ def fetch(fetch):
try:
payload = response.json()
except json.decoder.JSONDecodeError:
# we attempt to extract a <link rel=alternate> that points
# to an activity pub resource, if possible, and retry with this URL
alternate_url = utils.find_alternate(response.text)
if alternate_url:
fetch_obj.url = alternate_url
fetch_obj.save(update_fields=["url"])
return fetch(fetch_id=fetch_obj.pk)
return error("invalid_json")
payload, updated = mrf.inbox.apply(payload)
if not payload:
return error("blocked", message="Blocked by MRF")
try:
doc = jsonld.expand(payload)
except ValueError:
......@@ -332,13 +366,13 @@ def fetch(fetch):
except IndexError:
return error("missing_jsonld_type")
try:
serializer_class = fetch.serializers[type]
serializer_class = fetch_obj.serializers[type]
model = serializer_class.Meta.model
except (KeyError, AttributeError):
fetch.status = "skipped"
fetch.fetch_date = timezone.now()
fetch.detail = {"reason": "unhandled_type", "type": type}
return fetch.save(update_fields=["fetch_date", "status", "detail"])
fetch_obj.status = "skipped"
fetch_obj.fetch_date = timezone.now()
fetch_obj.detail = {"reason": "unhandled_type", "type": type}
return fetch_obj.save(update_fields=["fetch_date", "status", "detail"])
try:
id = doc.get("@id")
except IndexError:
......@@ -350,11 +384,14 @@ def fetch(fetch):
if not serializer.is_valid():
return error("validation", validation_errors=serializer.errors)
try:
serializer.save()
obj = serializer.save()
except Exception as e:
error("save", message=str(e))
raise
fetch.status = "finished"
fetch.fetch_date = timezone.now()
return fetch.save(update_fields=["fetch_date", "status"])
fetch_obj.object = obj
fetch_obj.status = "finished"
fetch_obj.fetch_date = timezone.now()
return fetch_obj.save(
update_fields=["fetch_date", "status", "object_id", "object_content_type"]
)
import html.parser
import unicodedata
import re
from django.conf import settings
......@@ -164,3 +165,39 @@ def get_actor_from_username_data_query(field, data):
"domain__name__iexact": data["domain"],
}
)
class StopParsing(Exception):
pass
class AlternateLinkParser(html.parser.HTMLParser):
def __init__(self, *args, **kwargs):
self.result = None
super().__init__(*args, **kwargs)
def handle_starttag(self, tag, attrs):
if tag != "link":
return
attrs_dict = dict(attrs)
if attrs_dict.get("rel") == "alternate" and attrs_dict.get(
"type", "application/activity+json"
):
self.result = attrs_dict.get("href")
raise StopParsing()
def handle_endtag(self, tag):