Newer
Older
Eliot Berriot
committed
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
Eliot Berriot
committed
Eliot Berriot
committed
import uuid
from django.conf import settings
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
from django.contrib.postgres.fields import JSONField
from django.db import models, transaction
from django.dispatch import receiver
from django.utils import timezone
Eliot Berriot
committed
from django.utils.translation import ugettext_lazy as _
from allauth.account.models import EmailAddress
from django_auth_ldap.backend import populate_user as ldap_populate_user
from oauth2_provider import models as oauth2_models
from oauth2_provider import validators as oauth2_validators
from versatileimagefield.fields import VersatileImageField
from funkwhale_api.common import fields, preferences
from funkwhale_api.common import utils as common_utils
from funkwhale_api.common import validators as common_validators
from funkwhale_api.federation import keys
from funkwhale_api.federation import models as federation_models
from funkwhale_api.federation import utils as federation_utils
Eliot Berriot
committed
def get_token(length=5):
wordlist_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "wordlist.txt"
)
with open(wordlist_path, "r") as f:
words = f.readlines()
phrase = "".join(random.choice(words) for i in range(length))
return phrase.replace("\n", "-").rstrip("-")
PERMISSIONS_CONFIGURATION = {
"moderation": {
"label": "Moderation",
"help_text": "Block/mute/remove domains, users and content",
"scopes": {
"read:instance:policies",
"write:instance:policies",
"read:instance:accounts",
"write:instance:accounts",
"read:instance:domains",
"write:instance:domains",
"read:instance:reports",
"write:instance:reports",
"read:instance:requests",
"write:instance:requests",
"read:instance:notes",
"write:instance:notes",
"library": {
"label": "Manage library",
"help_text": "Manage library, delete files, tracks, artists, albums...",
"scopes": {
"read:instance:edits",
"write:instance:edits",
"read:instance:libraries",
"write:instance:libraries",
},
},
"settings": {
"label": "Manage instance-level settings",
"help_text": "",
"scopes": {
"read:instance:settings",
"write:instance:settings",
"read:instance:users",
"write:instance:users",
"read:instance:invitations",
"write:instance:invitations",
},
},
}
PERMISSIONS = sorted(PERMISSIONS_CONFIGURATION.keys())
Eliot Berriot
committed
get_file_path = common_utils.ChunkedPath("users/avatars", preserve_file_name=False)
def get_default_instance_support_message_display_date():
return timezone.now() + datetime.timedelta(
days=settings.INSTANCE_SUPPORT_MESSAGE_DELAY
)
def get_default_funkwhale_support_message_display_date():
return timezone.now() + datetime.timedelta(
days=settings.FUNKWHALE_SUPPORT_MESSAGE_DELAY
)
class UserQuerySet(models.QuerySet):
def for_auth(self):
"""Optimization to avoid additional queries during authentication"""
qs = self.select_related("actor__domain")
verified_emails = EmailAddress.objects.filter(
user=models.OuterRef("id"), primary=True
).values("verified")[:1]
subquery = models.Subquery(verified_emails)
return qs.annotate(has_verified_primary_email=subquery).prefetch_related(
"plugins"
)
class UserManager(BaseUserManager):
def get_queryset(self):
return UserQuerySet(self.model, using=self._db)
def get_by_natural_key(self, key):
obj = BaseUserManager.get_by_natural_key(self.all().for_auth(), key)
return obj
Eliot Berriot
committed
class User(AbstractUser):
# First Name and Last Name do not cover name patterns
# around the globe.
name = models.CharField(_("Name of User"), blank=True, max_length=255)
Eliot Berriot
committed
# updated on logout or password change, to invalidate JWT
secret_key = models.UUIDField(default=uuid.uuid4, null=True)
privacy_level = fields.get_privacy_field()
# Unfortunately, Subsonic API assumes a MD5/password authentication
# scheme, which is weak in terms of security, and not achievable
# anyway since django use stronger schemes for storing passwords.
# Users that want to use the subsonic API from external client
# should set this token and use it as their password in such clients
subsonic_api_token = models.CharField(blank=True, null=True, max_length=255)
Eliot Berriot
committed
# permissions
permission_moderation = models.BooleanField(
PERMISSIONS_CONFIGURATION["moderation"]["label"],
help_text=PERMISSIONS_CONFIGURATION["moderation"]["help_text"],
permission_library = models.BooleanField(
PERMISSIONS_CONFIGURATION["library"]["label"],
help_text=PERMISSIONS_CONFIGURATION["library"]["help_text"],
default=False,
)
permission_settings = models.BooleanField(
PERMISSIONS_CONFIGURATION["settings"]["label"],
help_text=PERMISSIONS_CONFIGURATION["settings"]["help_text"],
default=False,
)
Eliot Berriot
committed
last_activity = models.DateTimeField(default=None, null=True, blank=True)
invitation = models.ForeignKey(
"Invitation",
related_name="users",
null=True,
blank=True,
on_delete=models.SET_NULL,
)
avatar = VersatileImageField(
upload_to=get_file_path,
null=True,
blank=True,
max_length=150,
validators=[
common_validators.ImageDimensionsValidator(min_width=50, min_height=50),
common_validators.FileValidator(
allowed_extensions=["png", "jpg", "jpeg", "gif"],
max_size=1024 * 1024 * 2,
),
],
)
actor = models.OneToOneField(
"federation.Actor",
related_name="user",
on_delete=models.SET_NULL,
null=True,
blank=True,
)
upload_quota = models.PositiveIntegerField(null=True, blank=True)
instance_support_message_display_date = models.DateTimeField(
default=get_default_instance_support_message_display_date, null=True, blank=True
)
funkwhale_support_message_display_date = models.DateTimeField(
default=get_default_funkwhale_support_message_display_date,
null=True,
blank=True,
)
settings = JSONField(default=None, null=True, blank=True, max_length=50000)
objects = UserManager()
Eliot Berriot
committed
def __str__(self):
return self.username
def get_permissions(self, defaults=None):
defaults = defaults or preferences.get("users__default_permissions")
Eliot Berriot
committed
perms = {}
for p in PERMISSIONS:
self.is_superuser
or getattr(self, "permission_{}".format(p))
or p in defaults
Eliot Berriot
committed
perms[p] = v
return perms
@property
def all_permissions(self):
return self.get_permissions()
@transaction.atomic
def set_settings(self, **settings):
u = self.__class__.objects.select_for_update().get(pk=self.pk)
if not u.settings:
u.settings = {}
for key, value in settings.items():
u.settings[key] = value
u.save(update_fields=["settings"])
self.settings = u.settings
def has_permissions(self, *perms, **kwargs):
operator = kwargs.pop("operator", "and")
if operator not in ["and", "or"]:
raise ValueError("Invalid operator {}".format(operator))
Eliot Berriot
committed
permissions = self.get_permissions()
return checker([permissions[p] for p in perms])
Eliot Berriot
committed
def get_absolute_url(self):
return reverse("users:detail", kwargs={"username": self.username})
Eliot Berriot
committed
def update_secret_key(self):
self.secret_key = uuid.uuid4()
return self.secret_key
def update_subsonic_api_token(self):
self.subsonic_api_token = get_token()
return self.subsonic_api_token
Eliot Berriot
committed
def set_password(self, raw_password):
super().set_password(raw_password)
self.update_secret_key()
if self.subsonic_api_token:
self.update_subsonic_api_token()
def get_activity_url(self):
return settings.FUNKWHALE_URL + "/@{}".format(self.username)
def record_activity(self):
"""
Simply update the last_activity field if current value is too old
than a threshold. This is useful to keep a track of inactive accounts.
"""
current = self.last_activity
delay = 60 * 15 # fifteen minutes
now = timezone.now()
if current is None or current < now - datetime.timedelta(seconds=delay):
self.last_activity = now
self.save(update_fields=["last_activity"])
def create_actor(self, **kwargs):
self.actor = create_actor(self, **kwargs)
self.save(update_fields=["actor"])
return self.actor
def get_upload_quota(self):
return (
self.upload_quota
if self.upload_quota is not None
else preferences.get("users__upload_quota")
)
def get_quota_status(self):
data = self.actor.get_current_usage()
max_ = self.get_upload_quota()
return {
"max": max_,
"remaining": max(max_ - (data["total"] / 1000 / 1000), 0),
"current": data["total"] / 1000 / 1000,
"draft": data["draft"] / 1000 / 1000,
"skipped": data["skipped"] / 1000 / 1000,
"pending": data["pending"] / 1000 / 1000,
"finished": data["finished"] / 1000 / 1000,
"errored": data["errored"] / 1000 / 1000,
}
def get_channels_groups(self):
groups = ["user.{}.{}".format(self.pk, g) for g in groups]
for permission, value in self.all_permissions.items():
if value:
groups.append("admin.{}".format(permission))
return groups
def full_username(self):
return "{}@{}".format(self.username, settings.FEDERATION_HOSTNAME)
def get_avatar(self):
if not self.actor:
return
return self.actor.attachment_icon
def generate_code(length=10):
return "".join(
random.SystemRandom().choice(string.ascii_uppercase) for _ in range(length)
)
class InvitationQuerySet(models.QuerySet):
def open(self, include=True):
now = timezone.now()
qs = self.annotate(_users=models.Count("users"))
query = models.Q(_users=0, expiration_date__gt=now)
if include:
return qs.filter(query)
return qs.exclude(query)
class Invitation(models.Model):
creation_date = models.DateTimeField(default=timezone.now)
expiration_date = models.DateTimeField()
owner = models.ForeignKey(
User, related_name="invitations", on_delete=models.CASCADE
)
code = models.CharField(max_length=50, unique=True)
objects = InvitationQuerySet.as_manager()
def save(self, **kwargs):
if not self.code:
self.code = generate_code()
if not self.expiration_date:
self.expiration_date = self.creation_date + datetime.timedelta(
days=settings.USERS_INVITATION_EXPIRATION_DAYS
)
return super().save(**kwargs)
class Application(oauth2_models.AbstractApplication):
scope = models.TextField(blank=True)
token = models.CharField(max_length=50, blank=True, null=True, unique=True)
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@property
def normalized_scopes(self):
from .oauth import permissions
raw_scopes = set(self.scope.split(" ") if self.scope else [])
return permissions.normalize(*raw_scopes)
# oob schemes are not supported yet in oauth toolkit
# (https://github.com/jazzband/django-oauth-toolkit/issues/235)
# so in the meantime, we override their validation to add support
OOB_SCHEMES = ["urn:ietf:wg:oauth:2.0:oob", "urn:ietf:wg:oauth:2.0:oob:auto"]
class CustomRedirectURIValidator(oauth2_validators.RedirectURIValidator):
def __call__(self, value):
if value in OOB_SCHEMES:
return value
return super().__call__(value)
oauth2_models.RedirectURIValidator = CustomRedirectURIValidator
class Grant(oauth2_models.AbstractGrant):
pass
class AccessToken(oauth2_models.AbstractAccessToken):
pass
class RefreshToken(oauth2_models.AbstractRefreshToken):
pass
slugified_username = federation_utils.slugify_username(username)
domain = kwargs.get("domain")
if not domain:
domain = federation_models.Domain.objects.get_or_create(
name=settings.FEDERATION_HOSTNAME
)[0]
Eliot Berriot
committed
"name": kwargs.get("name", username),
"summary": kwargs.get("summary"),
"manually_approves_followers": False,
"fid": federation_utils.full_url(
reverse(
"federation:actors-detail",
kwargs={"preferred_username": slugified_username},
)
"shared_inbox_url": federation_models.get_shared_inbox_url(),
"inbox_url": federation_utils.full_url(
reverse(
"federation:actors-inbox",
kwargs={"preferred_username": slugified_username},
)
),
"outbox_url": federation_utils.full_url(
reverse(
"federation:actors-outbox",
kwargs={"preferred_username": slugified_username},
)
"followers_url": federation_utils.full_url(
reverse(
"federation:actors-followers",
kwargs={"preferred_username": slugified_username},
)
),
"following_url": federation_utils.full_url(
reverse(
"federation:actors-following",
kwargs={"preferred_username": slugified_username},
private, public = keys.get_key_pair()
args["private_key"] = private.decode("utf-8")
args["public_key"] = public.decode("utf-8")
return federation_models.Actor.objects.create(user=user, **args)
@receiver(ldap_populate_user)
def init_ldap_user(sender, user, ldap_user, **kwargs):
if not user.actor:
user.actor = create_actor(user)