Newer
Older
import logging
Eliot Berriot
committed
import time
from django.conf import settings
from django.db import transaction
from rest_framework.decorators import action
from rest_framework import exceptions
from rest_framework import mixins
from rest_framework import permissions
from rest_framework import response
Eliot Berriot
committed
from rest_framework import views
from rest_framework import viewsets
from funkwhale_api.users.oauth import permissions as oauth_permissions
from . import filters
from . import models
from . import mutations
from . import serializers
from . import signals
from . import tasks
Eliot Berriot
committed
from . import throttling
logger = logging.getLogger(__name__)
class SkipFilterForGetObject:
def get_object(self, *args, **kwargs):
setattr(self.request, "_skip_filters", True)
return super().get_object(*args, **kwargs)
def filter_queryset(self, queryset):
if getattr(self.request, "_skip_filters", False):
return queryset
return super().filter_queryset(queryset)
class MutationViewSet(
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
lookup_field = "uuid"
queryset = (
models.Mutation.objects.all()
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
.order_by("-creation_date")
.select_related("created_by", "approved_by")
.prefetch_related("target")
)
serializer_class = serializers.APIMutationSerializer
permission_classes = [permissions.IsAuthenticated]
ordering_fields = ("creation_date",)
filterset_class = filters.MutationFilter
def perform_destroy(self, instance):
if instance.is_applied:
raise exceptions.PermissionDenied("You cannot delete an applied mutation")
actor = self.request.user.actor
is_owner = actor == instance.created_by
if not any(
[
is_owner,
mutations.registry.has_perm(
perm="approve", type=instance.type, obj=instance.target, actor=actor
),
]
):
raise exceptions.PermissionDenied()
return super().perform_destroy(instance)
@action(detail=True, methods=["post"])
@transaction.atomic
def approve(self, request, *args, **kwargs):
instance = self.get_object()
if instance.is_applied:
return response.Response(
{"error": "This mutation was already applied"}, status=403
)
actor = self.request.user.actor
can_approve = mutations.registry.has_perm(
perm="approve", type=instance.type, obj=instance.target, actor=actor
)
if not can_approve:
raise exceptions.PermissionDenied()
previous_is_approved = instance.is_approved
instance.approved_by = actor
instance.is_approved = True
instance.save(update_fields=["approved_by", "is_approved"])
utils.on_commit(tasks.apply_mutation.delay, mutation_id=instance.id)
utils.on_commit(
signals.mutation_updated.send,
sender=None,
mutation=instance,
old_is_approved=previous_is_approved,
new_is_approved=instance.is_approved,
)
return response.Response({}, status=200)
@action(detail=True, methods=["post"])
@transaction.atomic
def reject(self, request, *args, **kwargs):
instance = self.get_object()
if instance.is_applied:
return response.Response(
{"error": "This mutation was already applied"}, status=403
)
actor = self.request.user.actor
can_approve = mutations.registry.has_perm(
perm="approve", type=instance.type, obj=instance.target, actor=actor
)
if not can_approve:
raise exceptions.PermissionDenied()
previous_is_approved = instance.is_approved
instance.approved_by = actor
instance.is_approved = False
instance.save(update_fields=["approved_by", "is_approved"])
utils.on_commit(
signals.mutation_updated.send,
sender=None,
mutation=instance,
old_is_approved=previous_is_approved,
new_is_approved=instance.is_approved,
)
return response.Response({}, status=200)
Eliot Berriot
committed
class RateLimitView(views.APIView):
permission_classes = []
throttle_classes = []
def get(self, request, *args, **kwargs):
ident = throttling.get_ident(getattr(request, "user", None), request)
Eliot Berriot
committed
data = {
"enabled": settings.THROTTLING_ENABLED,
"ident": ident,
"scopes": throttling.get_status(ident, time.time()),
}
return response.Response(data, status=200)
class AttachmentViewSet(
mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
lookup_field = "uuid"
queryset = models.Attachment.objects.all()
serializer_class = serializers.AttachmentSerializer
permission_classes = [oauth_permissions.ScopePermission]
required_scope = "libraries"
anonymous_policy = "setting"
@action(
detail=True, methods=["get"], permission_classes=[], authentication_classes=[]
)
@transaction.atomic
def proxy(self, request, *args, **kwargs):
instance = self.get_object()
if not settings.EXTERNAL_MEDIA_PROXY_ENABLED:
r = response.Response(status=302)
r["Location"] = instance.url
return r
if size not in ["original", "medium_square_crop", "large_square_crop"]:
try:
tasks.fetch_remote_attachment(instance)
except Exception:
logger.exception("Error while fetching attachment %s", instance.url)
return response.Response(status=500)
data = self.serializer_class(instance).data
redirect = response.Response(status=302)
redirect["Location"] = data["urls"][size]
return redirect
def perform_create(self, serializer):
return serializer.save(actor=self.request.user.actor)
def perform_destroy(self, instance):
if instance.actor is None or instance.actor != self.request.user.actor:
raise exceptions.PermissionDenied()
instance.delete()
class TextPreviewView(views.APIView):
permission_classes = []
def post(self, request, *args, **kwargs):
payload = request.data
if "text" not in payload:
return response.Response({"detail": "Invalid input"}, status=400)
Eliot Berriot
committed
permissive = payload.get("permissive", False)
data = {
"rendered": utils.render_html(
payload["text"], "text/markdown", permissive=permissive
)
}
return response.Response(data, status=200)
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class PluginViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
required_scope = "plugins"
serializer_class = serializers.serializers.Serializer
queryset = models.PluginConfiguration.objects.none()
def list(self, request, *args, **kwargs):
user = request.user
user_plugins = [p for p in plugins._plugins.values() if p["user"] is True]
return response.Response(
[
plugins.serialize_plugin(p, confs=plugins.get_confs(user=user))
for p in user_plugins
]
)
def retrieve(self, request, *args, **kwargs):
user = request.user
user_plugin = [
p
for p in plugins._plugins.values()
if p["user"] is True and p["name"] == kwargs["pk"]
]
if not user_plugin:
return response.Response(status=404)
return response.Response(
plugins.serialize_plugin(user_plugin[0], confs=plugins.get_confs(user=user))
)
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
user = request.user
confs = plugins.get_confs(user=user)
user_plugin = [
p
for p in plugins._plugins.values()
if p["user"] is True and p["name"] == kwargs["pk"]
]
if kwargs["pk"] not in confs:
return response.Response(status=404)
plugins.set_conf(kwargs["pk"], request.data, user)
return response.Response(
plugins.serialize_plugin(user_plugin[0], confs=plugins.get_confs(user=user))
)
def delete(self, request, *args, **kwargs):
user = request.user
confs = plugins.get_confs(user=user)
if kwargs["pk"] not in confs:
return response.Response(status=404)
user.plugins.filter(code=kwargs["pk"]).delete()
return response.Response(status=204)
@action(detail=True, methods=["post"])
def enable(self, request, *args, **kwargs):
user = request.user
if kwargs["pk"] not in plugins._plugins:
return response.Response(status=404)
plugins.enable_conf(kwargs["pk"], True, user)
return response.Response({}, status=200)
@action(detail=True, methods=["post"])
def disable(self, request, *args, **kwargs):
user = request.user
if kwargs["pk"] not in plugins._plugins:
return response.Response(status=404)
plugins.enable_conf(kwargs["pk"], False, user)
return response.Response({}, status=200)
@action(detail=True, methods=["post"])
def scan(self, request, *args, **kwargs):
user = request.user
if kwargs["pk"] not in plugins._plugins:
return response.Response(status=404)
conf = plugins.get_conf(kwargs["pk"], user=user)
if not conf["enabled"]:
return response.Response(status=405)
library = request.user.actor.libraries.get(uuid=conf["conf"]["library"])
hook = [
hook
for p, hook in plugins._hooks.get(plugins.SCAN, [])
if p == kwargs["pk"]
]
if not hook:
return response.Response(status=405)
hook[0](library=library, conf=conf["conf"])
return response.Response({}, status=200)