Skip to content
Snippets Groups Projects
models.py 12.8 KiB
Newer Older
Eliot Berriot's avatar
Eliot Berriot committed
from __future__ import absolute_import, unicode_literals
import datetime
import random
import string
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.dispatch import receiver
Eliot Berriot's avatar
Eliot Berriot committed
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from django_auth_ldap.backend import populate_user as ldap_populate_user
from oauth2_provider import models as oauth2_models
from oauth2_provider import validators as oauth2_validators
from versatileimagefield.fields import VersatileImageField

Eliot Berriot's avatar
Eliot Berriot committed
from funkwhale_api.common import fields, preferences
from funkwhale_api.common import utils as common_utils
from funkwhale_api.common import validators as common_validators
from funkwhale_api.federation import keys
from funkwhale_api.federation import models as federation_models
from funkwhale_api.federation import utils as federation_utils
Eliot Berriot's avatar
Eliot Berriot committed
    return binascii.b2a_hex(os.urandom(15)).decode("utf-8")
PERMISSIONS_CONFIGURATION = {
    "moderation": {
        "label": "Moderation",
        "help_text": "Block/mute/remove domains, users and content",
        "scopes": {
            "read:instance:policies",
            "write:instance:policies",
            "read:instance:accounts",
            "write:instance:accounts",
            "read:instance:domains",
            "write:instance:domains",
            "read:instance:reports",
            "write:instance:reports",
            "read:instance:notes",
            "write:instance:notes",
Eliot Berriot's avatar
Eliot Berriot committed
    "library": {
        "label": "Manage library",
        "help_text": "Manage library, delete files, tracks, artists, albums...",
        "scopes": {
            "read:instance:edits",
            "write:instance:edits",
            "read:instance:libraries",
            "write:instance:libraries",
        },
    },
    "settings": {
        "label": "Manage instance-level settings",
        "help_text": "",
        "scopes": {
            "read:instance:settings",
            "write:instance:settings",
            "read:instance:users",
            "write:instance:users",
            "read:instance:invitations",
            "write:instance:invitations",
        },
    },
}

PERMISSIONS = sorted(PERMISSIONS_CONFIGURATION.keys())
get_file_path = common_utils.ChunkedPath("users/avatars", preserve_file_name=False)


def get_default_instance_support_message_display_date():
    return timezone.now() + datetime.timedelta(
        days=settings.INSTANCE_SUPPORT_MESSAGE_DELAY
    )


def get_default_funkwhale_support_message_display_date():
    return timezone.now() + datetime.timedelta(
        days=settings.FUNKWHALE_SUPPORT_MESSAGE_DELAY
    )


class User(AbstractUser):

    # First Name and Last Name do not cover name patterns
    # around the globe.
    name = models.CharField(_("Name of User"), blank=True, max_length=255)

    # updated on logout or password change, to invalidate JWT
    secret_key = models.UUIDField(default=uuid.uuid4, null=True)
    privacy_level = fields.get_privacy_field()

    # Unfortunately, Subsonic API assumes a MD5/password authentication
    # scheme, which is weak in terms of security, and not achievable
    # anyway since django use stronger schemes for storing passwords.
    # Users that want to use the subsonic API from external client
    # should set this token and use it as their password in such clients
Eliot Berriot's avatar
Eliot Berriot committed
    subsonic_api_token = models.CharField(blank=True, null=True, max_length=255)
    permission_moderation = models.BooleanField(
        PERMISSIONS_CONFIGURATION["moderation"]["label"],
        help_text=PERMISSIONS_CONFIGURATION["moderation"]["help_text"],
Eliot Berriot's avatar
Eliot Berriot committed
        default=False,
    )
    permission_library = models.BooleanField(
Eliot Berriot's avatar
Eliot Berriot committed
        PERMISSIONS_CONFIGURATION["library"]["label"],
        help_text=PERMISSIONS_CONFIGURATION["library"]["help_text"],
        default=False,
    )
    permission_settings = models.BooleanField(
Eliot Berriot's avatar
Eliot Berriot committed
        PERMISSIONS_CONFIGURATION["settings"]["label"],
        help_text=PERMISSIONS_CONFIGURATION["settings"]["help_text"],
        default=False,
    )
    last_activity = models.DateTimeField(default=None, null=True, blank=True)

    invitation = models.ForeignKey(
        "Invitation",
        related_name="users",
        null=True,
        blank=True,
        on_delete=models.SET_NULL,
    )
    avatar = VersatileImageField(
        upload_to=get_file_path,
        null=True,
        blank=True,
        max_length=150,
        validators=[
            common_validators.ImageDimensionsValidator(min_width=50, min_height=50),
            common_validators.FileValidator(
                allowed_extensions=["png", "jpg", "jpeg", "gif"],
                max_size=1024 * 1024 * 2,
            ),
        ],
    )
    actor = models.OneToOneField(
        "federation.Actor",
        related_name="user",
        on_delete=models.SET_NULL,
        null=True,
        blank=True,
    )
    upload_quota = models.PositiveIntegerField(null=True, blank=True)

    instance_support_message_display_date = models.DateTimeField(
        default=get_default_instance_support_message_display_date, null=True, blank=True
    )
    funkwhale_support_message_display_date = models.DateTimeField(
        default=get_default_funkwhale_support_message_display_date,
        null=True,
        blank=True,
    )

Eliot Berriot's avatar
Eliot Berriot committed
    def get_permissions(self, defaults=None):
        defaults = defaults or preferences.get("users__default_permissions")
Eliot Berriot's avatar
Eliot Berriot committed
                self.is_superuser
                or getattr(self, "permission_{}".format(p))
                or p in defaults
    @property
    def all_permissions(self):
        return self.get_permissions()

    def has_permissions(self, *perms, **kwargs):
        operator = kwargs.pop("operator", "and")
Eliot Berriot's avatar
Eliot Berriot committed
        if operator not in ["and", "or"]:
            raise ValueError("Invalid operator {}".format(operator))
Eliot Berriot's avatar
Eliot Berriot committed
        checker = all if operator == "and" else any
        return checker([permissions[p] for p in perms])
Eliot Berriot's avatar
Eliot Berriot committed
        return reverse("users:detail", kwargs={"username": self.username})

    def update_secret_key(self):
        self.secret_key = uuid.uuid4()
        return self.secret_key

    def update_subsonic_api_token(self):
        self.subsonic_api_token = get_token()
        return self.subsonic_api_token

    def set_password(self, raw_password):
        super().set_password(raw_password)
        self.update_secret_key()
        if self.subsonic_api_token:
            self.update_subsonic_api_token()
Eliot Berriot's avatar
Eliot Berriot committed
        return settings.FUNKWHALE_URL + "/@{}".format(self.username)

    def record_activity(self):
        """
        Simply update the last_activity field if current value is too old
        than a threshold. This is useful to keep a track of inactive accounts.
        """
        current = self.last_activity
        delay = 60 * 15  # fifteen minutes
        now = timezone.now()

        if current is None or current < now - datetime.timedelta(seconds=delay):
            self.last_activity = now
            self.save(update_fields=["last_activity"])
    def create_actor(self, **kwargs):
        self.actor = create_actor(self, **kwargs)
        self.save(update_fields=["actor"])
        return self.actor

    def get_upload_quota(self):
        return self.upload_quota or preferences.get("users__upload_quota")

    def get_quota_status(self):
        data = self.actor.get_current_usage()
        max_ = self.get_upload_quota()
        return {
            "max": max_,
            "remaining": max(max_ - (data["total"] / 1000 / 1000), 0),
            "current": data["total"] / 1000 / 1000,
            "draft": data["draft"] / 1000 / 1000,
            "skipped": data["skipped"] / 1000 / 1000,
            "pending": data["pending"] / 1000 / 1000,
            "finished": data["finished"] / 1000 / 1000,
            "errored": data["errored"] / 1000 / 1000,
        }

    def get_channels_groups(self):
        groups = ["imports", "inbox"]
        groups = ["user.{}.{}".format(self.pk, g) for g in groups]
        for permission, value in self.all_permissions.items():
            if value:
                groups.append("admin.{}".format(permission))

        return groups
    def full_username(self):
        return "{}@{}".format(self.username, settings.FEDERATION_HOSTNAME)

    def get_avatar(self):
        if not self.actor:
            return
        return self.actor.attachment_icon

def generate_code(length=10):
    return "".join(
        random.SystemRandom().choice(string.ascii_uppercase) for _ in range(length)
    )


class InvitationQuerySet(models.QuerySet):
        now = timezone.now()
        qs = self.annotate(_users=models.Count("users"))
        query = models.Q(_users=0, expiration_date__gt=now)
        if include:
            return qs.filter(query)
        return qs.exclude(query)


class Invitation(models.Model):
    creation_date = models.DateTimeField(default=timezone.now)
    expiration_date = models.DateTimeField()
    owner = models.ForeignKey(
        User, related_name="invitations", on_delete=models.CASCADE
    )
    code = models.CharField(max_length=50, unique=True)

    objects = InvitationQuerySet.as_manager()

    def save(self, **kwargs):
        if not self.code:
            self.code = generate_code()
        if not self.expiration_date:
            self.expiration_date = self.creation_date + datetime.timedelta(
                days=settings.USERS_INVITATION_EXPIRATION_DAYS
            )

        return super().save(**kwargs)
class Application(oauth2_models.AbstractApplication):
    scope = models.TextField(blank=True)

    @property
    def normalized_scopes(self):
        from .oauth import permissions

        raw_scopes = set(self.scope.split(" ") if self.scope else [])
        return permissions.normalize(*raw_scopes)


# oob schemes are not supported yet in oauth toolkit
# (https://github.com/jazzband/django-oauth-toolkit/issues/235)
# so in the meantime, we override their validation to add support
OOB_SCHEMES = ["urn:ietf:wg:oauth:2.0:oob", "urn:ietf:wg:oauth:2.0:oob:auto"]


class CustomRedirectURIValidator(oauth2_validators.RedirectURIValidator):
    def __call__(self, value):
        if value in OOB_SCHEMES:
            return value
        return super().__call__(value)


oauth2_models.RedirectURIValidator = CustomRedirectURIValidator


class Grant(oauth2_models.AbstractGrant):
    pass


class AccessToken(oauth2_models.AbstractAccessToken):
    pass


class RefreshToken(oauth2_models.AbstractRefreshToken):
    pass


Eliot Berriot's avatar
Eliot Berriot committed
def get_actor_data(username, **kwargs):
Eliot Berriot's avatar
Eliot Berriot committed
    slugified_username = federation_utils.slugify_username(username)
Eliot Berriot's avatar
Eliot Berriot committed
    domain = kwargs.get("domain")
    if not domain:
        domain = federation_models.Domain.objects.get_or_create(
            name=settings.FEDERATION_HOSTNAME
        )[0]
Eliot Berriot's avatar
Eliot Berriot committed
        "preferred_username": slugified_username,
Eliot Berriot's avatar
Eliot Berriot committed
        "domain": domain,
        "type": "Person",
        "name": kwargs.get("name", username),
        "summary": kwargs.get("summary"),
        "manually_approves_followers": False,
        "fid": federation_utils.full_url(
Eliot Berriot's avatar
Eliot Berriot committed
            reverse(
                "federation:actors-detail",
                kwargs={"preferred_username": slugified_username},
            )
        "shared_inbox_url": federation_models.get_shared_inbox_url(),
        "inbox_url": federation_utils.full_url(
Eliot Berriot's avatar
Eliot Berriot committed
            reverse(
                "federation:actors-inbox",
                kwargs={"preferred_username": slugified_username},
            )
        ),
        "outbox_url": federation_utils.full_url(
Eliot Berriot's avatar
Eliot Berriot committed
            reverse(
                "federation:actors-outbox",
                kwargs={"preferred_username": slugified_username},
            )
Eliot Berriot's avatar
Eliot Berriot committed
        "followers_url": federation_utils.full_url(
            reverse(
Eliot Berriot's avatar
Eliot Berriot committed
                "federation:actors-followers",
                kwargs={"preferred_username": slugified_username},
Eliot Berriot's avatar
Eliot Berriot committed
            )
        ),
        "following_url": federation_utils.full_url(
            reverse(
Eliot Berriot's avatar
Eliot Berriot committed
                "federation:actors-following",
                kwargs={"preferred_username": slugified_username},
Eliot Berriot's avatar
Eliot Berriot committed
            )
        ),
def create_actor(user, **kwargs):
Eliot Berriot's avatar
Eliot Berriot committed
    args = get_actor_data(user.username)
    args.update(kwargs)
    private, public = keys.get_key_pair()
    args["private_key"] = private.decode("utf-8")
    args["public_key"] = public.decode("utf-8")

    return federation_models.Actor.objects.create(user=user, **args)
@receiver(ldap_populate_user)
def init_ldap_user(sender, user, ldap_user, **kwargs):
    if not user.actor:
        user.actor = create_actor(user)