Skip to content
Snippets Groups Projects
serializers.py 11.6 KiB
Newer Older
import collections
from rest_framework import serializers

from django.core.exceptions import ObjectDoesNotExist
from django.core.files.uploadedfile import SimpleUploadedFile
from django.utils.encoding import smart_text
from django.utils.translation import ugettext_lazy as _

from . import models

class RelatedField(serializers.RelatedField):
    default_error_messages = {
        "does_not_exist": _("Object with {related_field_name}={value} does not exist."),
        "invalid": _("Invalid value."),
    }

    def __init__(self, related_field_name, serializer, **kwargs):
        self.related_field_name = related_field_name
        self.serializer = serializer
        self.filters = kwargs.pop("filters", None)
Eliot Berriot's avatar
Eliot Berriot committed
        self.queryset_filter = kwargs.pop("queryset_filter", None)
        try:
            kwargs["queryset"] = kwargs.pop("queryset")
        except KeyError:
            kwargs["queryset"] = self.serializer.Meta.model.objects.all()
        super().__init__(**kwargs)

    def get_filters(self, data):
        filters = {self.related_field_name: data}
        if self.filters:
            filters.update(self.filters(self.context))
        return filters

Eliot Berriot's avatar
Eliot Berriot committed
    def filter_queryset(self, queryset):
        if self.queryset_filter:
            queryset = self.queryset_filter(queryset, self.context)
        return queryset

    def to_internal_value(self, data):
        try:
            queryset = self.get_queryset()
            filters = self.get_filters(data)
Eliot Berriot's avatar
Eliot Berriot committed
            queryset = self.filter_queryset(queryset)
            return queryset.get(**filters)
        except ObjectDoesNotExist:
            self.fail(
                "does_not_exist",
                related_field_name=self.related_field_name,
                value=smart_text(data),
            )
        except (TypeError, ValueError):
            self.fail("invalid")

    def to_representation(self, obj):
        return self.serializer.to_representation(obj)

    def get_choices(self, cutoff=None):
        queryset = self.get_queryset()
        if queryset is None:
            # Ensure that field.choices returns something sensible
            # even when accessed with a read-only field.
            return {}

        if cutoff is not None:
            queryset = queryset[:cutoff]

        return collections.OrderedDict(
            [
                (
                    self.to_representation(item)[self.related_field_name],
                    self.display_value(item),
                )
                for item in queryset
class Action(object):
    def __init__(self, name, allow_all=False, qs_filter=None):
        self.name = name
        self.allow_all = allow_all
        self.qs_filter = qs_filter

    def __repr__(self):
        return "<Action {}>".format(self.name)


class ActionSerializer(serializers.Serializer):
    """
    A special serializer that can operate on a list of objects
    and apply actions on it.
    """

    action = serializers.CharField(required=True)
    objects = serializers.JSONField(required=True)
    filters = serializers.DictField(required=False)
    actions = None

    def __init__(self, *args, **kwargs):
        self.actions_by_name = {a.name: a for a in self.actions}
Eliot Berriot's avatar
Eliot Berriot committed
        self.queryset = kwargs.pop("queryset")
        if self.actions is None:
            raise ValueError(
Eliot Berriot's avatar
Eliot Berriot committed
                "You must declare a list of actions on " "the serializer class"
            )
        for action in self.actions_by_name.keys():
Eliot Berriot's avatar
Eliot Berriot committed
            handler_name = "handle_{}".format(action)
            assert hasattr(self, handler_name), "{} miss a {} method".format(
                self.__class__.__name__, handler_name
            )
        super().__init__(self, *args, **kwargs)

    def validate_action(self, value):
        try:
            return self.actions_by_name[value]
        except KeyError:
            raise serializers.ValidationError(
Eliot Berriot's avatar
Eliot Berriot committed
                "{} is not a valid action. Pick one of {}.".format(
                    value, ", ".join(self.actions_by_name.keys())
                )
            )

    def validate_objects(self, value):
Eliot Berriot's avatar
Eliot Berriot committed
        if value == "all":
            return self.queryset.all().order_by("id")
        if type(value) in [list, tuple]:
            return self.queryset.filter(
                **{"{}__in".format(self.pk_field): value}
            ).order_by(self.pk_field)

        raise serializers.ValidationError(
Eliot Berriot's avatar
Eliot Berriot committed
            "{} is not a valid value for objects. You must provide either a "
            'list of identifiers or the string "all".'.format(value)
        )

    def validate(self, data):
        allow_all = data["action"].allow_all
        if not allow_all and self.initial_data["objects"] == "all":
                "You cannot apply this action on all objects"
Eliot Berriot's avatar
Eliot Berriot committed
            )
        final_filters = data.get("filters", {}) or {}
        if self.filterset_class and final_filters:
            qs_filterset = self.filterset_class(final_filters, queryset=data["objects"])
            try:
                assert qs_filterset.form.is_valid()
            except (AssertionError, TypeError):
Eliot Berriot's avatar
Eliot Berriot committed
                raise serializers.ValidationError("Invalid filters")
            data["objects"] = qs_filterset.qs
        if data["action"].qs_filter:
            data["objects"] = data["action"].qs_filter(data["objects"])

Eliot Berriot's avatar
Eliot Berriot committed
        data["count"] = data["objects"].count()
        if data["count"] < 1:
            raise serializers.ValidationError("No object matching your request")
        return data

    def save(self):
        handler_name = "handle_{}".format(self.validated_data["action"].name)
        handler = getattr(self, handler_name)
Eliot Berriot's avatar
Eliot Berriot committed
        result = handler(self.validated_data["objects"])
        payload = {
Eliot Berriot's avatar
Eliot Berriot committed
            "updated": self.validated_data["count"],
            "action": self.validated_data["action"].name,
Eliot Berriot's avatar
Eliot Berriot committed
            "result": result,
        }
        return payload


def track_fields_for_update(*fields):
    """
    Apply this decorator to serializer to call function when specific values
    are updated on an object:

    .. code-block:: python

        @track_fields_for_update('privacy_level')
        class LibrarySerializer(serializers.ModelSerializer):
            def on_updated_privacy_level(self, obj, old_value, new_value):
                print('Do someting')
    """

    def decorator(serializer_class):
        original_update = serializer_class.update

        def new_update(self, obj, validated_data):
            tracked_fields_before = {f: getattr(obj, f) for f in fields}
            obj = original_update(self, obj, validated_data)
            tracked_fields_after = {f: getattr(obj, f) for f in fields}

            if tracked_fields_before != tracked_fields_after:
                self.on_updated_fields(obj, tracked_fields_before, tracked_fields_after)
            return obj

        serializer_class.update = new_update
        return serializer_class

    return decorator


class StripExifImageField(serializers.ImageField):
    def to_internal_value(self, data):
        file_obj = super().to_internal_value(data)

        image = PIL.Image.open(file_obj)
        data = list(image.getdata())
        image_without_exif = PIL.Image.new(image.mode, image.size)
        image_without_exif.putdata(data)

        with io.BytesIO() as output:
            image_without_exif.save(
                output,
                format=PIL.Image.EXTENSION[os.path.splitext(file_obj.name)[-1].lower()],
                quality=100,
            )
            content = output.getvalue()

        return SimpleUploadedFile(
            file_obj.name, content, content_type=file_obj.content_type
        )


from funkwhale_api.federation import serializers as federation_serializers  # noqa

TARGET_ID_TYPE_MAPPING = {
    "music.Track": ("id", "track"),
    "music.Artist": ("id", "artist"),
    "music.Album": ("id", "album"),
}


class APIMutationSerializer(serializers.ModelSerializer):
    created_by = federation_serializers.APIActorSerializer(read_only=True)
    target = serializers.SerializerMethodField()

    class Meta:
        model = models.Mutation
        fields = [
            "fid",
            "uuid",
            "type",
            "creation_date",
            "applied_date",
            "is_approved",
            "is_applied",
            "created_by",
            "approved_by",
            "summary",
            "payload",
            "previous_state",
            "target",
        ]
        read_only_fields = [
            "uuid",
            "creation_date",
            "fid",
            "is_applied",
            "created_by",
            "approved_by",
            "previous_state",
        ]

    def get_target(self, obj):
        target = obj.target
        if not target:
            return

        id_field, type = TARGET_ID_TYPE_MAPPING[target._meta.label]
        return {"type": type, "id": getattr(target, id_field), "repr": str(target)}

    def validate_type(self, value):
        if value not in self.context["registry"]:
            raise serializers.ValidationError("Invalid mutation type {}".format(value))
        return value
Eliot Berriot's avatar
Eliot Berriot committed


class AttachmentSerializer(serializers.Serializer):
    uuid = serializers.UUIDField(read_only=True)
    size = serializers.IntegerField(read_only=True)
    mimetype = serializers.CharField(read_only=True)
    creation_date = serializers.DateTimeField(read_only=True)
    file = StripExifImageField(write_only=True)
    urls = serializers.SerializerMethodField()

    def get_urls(self, o):
        urls = {}
        urls["source"] = o.url
        urls["original"] = o.download_url_original
        urls["medium_square_crop"] = o.download_url_medium_square_crop
        return urls

    def to_representation(self, o):
        repr = super().to_representation(o)
        # XXX: BACKWARD COMPATIBILITY
        # having the attachment urls in a nested JSON obj is better,
        # but we can't do this without breaking clients
        # So we extract the urls and include these in the parent payload
        repr.update({k: v for k, v in repr["urls"].items() if k != "source"})
        # also, our legacy images had lots of variations (400x400, 200x200, 50x50)
        # but we removed some of these, so we emulate these by hand (by redirecting)
        # to actual, existing attachment variations
        repr["square_crop"] = repr["medium_square_crop"]
        repr["small_square_crop"] = repr["medium_square_crop"]
        return repr

    def create(self, validated_data):
        return models.Attachment.objects.create(
            file=validated_data["file"], actor=validated_data["actor"]
        )


class ContentSerializer(serializers.Serializer):
    text = serializers.CharField(max_length=models.CONTENT_TEXT_MAX_LENGTH)
    content_type = serializers.ChoiceField(choices=models.CONTENT_TEXT_SUPPORTED_TYPES,)
    html = serializers.SerializerMethodField()

    def get_html(self, o):
        return utils.render_html(o.text, o.content_type)
Eliot Berriot's avatar
Eliot Berriot committed


class NullToEmptDict(object):
    def get_attribute(self, o):
        attr = super().get_attribute(o)
        if attr is None:
            return {}
        return attr

    def to_representation(self, v):
        if not v:
            return v
        return super().to_representation(v)
Agate's avatar
Agate committed


class PodPluginSerializer(serializers.Serializer):
    code = serializers.CharField(read_only=True)
    enabled = serializers.BooleanField()
    conf = serializers.JSONField()
    label = serializers.SerializerMethodField()

    class Meta:
        fields = [
            "code",
            "label",
            "enabled",
            "conf",
        ]

    def get_label(self, o):
        return o.plugin.verbose_name