Commit 654d2060 authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Server CLI: user management

parent 900fabae
import click
import functools
@click.group()
def cli():
pass
def confirm_action(f, id_var, message_template="Do you want to proceed?"):
@functools.wraps(f)
def action(*args, **kwargs):
if id_var:
id_value = kwargs[id_var]
message = message_template.format(len(id_value))
else:
message = message_template
if not kwargs.pop("no_input", False) and not click.confirm(message, abort=True):
return
return f(*args, **kwargs)
return action
def delete_command(
group,
id_var="id",
name="rm",
message_template="Do you want to delete {} objects? This action is irreversible.",
):
"""
Wrap a command to ensure it asks for confirmation before deletion, unless the --no-input
flag is provided
"""
def decorator(f):
decorated = click.option("--no-input", is_flag=True)(f)
decorated = confirm_action(
decorated, id_var=id_var, message_template=message_template
)
return group.command(name)(decorated)
return decorator
def update_command(
group,
id_var="id",
name="set",
message_template="Do you want to update {} objects? This action may have irreversible consequnces.",
):
"""
Wrap a command to ensure it asks for confirmation before deletion, unless the --no-input
flag is provided
"""
def decorator(f):
decorated = click.option("--no-input", is_flag=True)(f)
decorated = confirm_action(
decorated, id_var=id_var, message_template=message_template
)
return group.command(name)(decorated)
return decorator
import click
import sys
from . import base
from . import users # noqa
from rest_framework.exceptions import ValidationError
def invoke():
try:
return base.cli()
except ValidationError as e:
click.secho("Invalid data:", fg="red")
for field, errors in e.detail.items():
click.secho(" {}:".format(field), fg="red")
for error in errors:
click.secho(" - {}".format(error), fg="red")
sys.exit(1)
import click
from django.db import transaction
from funkwhale_api.federation import models as federation_models
from funkwhale_api.users import models
from funkwhale_api.users import serializers
from funkwhale_api.users import tasks
from . import base
from . import utils
class FakeRequest(object):
def __init__(self, session={}):
self.session = session
@transaction.atomic
def handler_create_user(
username,
password,
email,
is_superuser=False,
is_staff=False,
permissions=[],
upload_quota=None,
):
serializer = serializers.RS(
data={
"username": username,
"email": email,
"password1": password,
"password2": password,
}
)
utils.logger.debug("Validating user data…")
serializer.is_valid(raise_exception=True)
# Override email validation, we assume accounts created from CLI have a valid email
request = FakeRequest(session={"account_verified_email": email})
utils.logger.debug("Creating user…")
user = serializer.save(request=request)
utils.logger.debug("Setting permissions and other attributes…")
user.is_staff = is_staff
user.upload_quota = upload_quota
user.is_superuser = is_superuser
for permission in permissions:
if permission in models.PERMISSIONS:
utils.logger.debug("Setting %s permission to True", permission)
setattr(user, "permission_{}".format(permission), True)
else:
utils.logger.warn("Unknown permission %s", permission)
utils.logger.debug("Creating actor…")
user.actor = models.create_actor(user)
user.save()
return user
@transaction.atomic
def handler_delete_user(usernames, soft=True):
for username in usernames:
click.echo("Deleting {}…".format(username))
actor = None
user = None
try:
user = models.User.objects.get(username=username)
except models.User.DoesNotExist:
try:
actor = federation_models.Actor.objects.local().get(
preferred_username=username
)
except federation_models.Actor.DoesNotExist:
click.echo(" Not found, skipping")
continue
actor = actor or user.actor
if user:
tasks.delete_account(user_id=user.pk)
if not soft:
click.echo(" Hard delete, removing actor")
actor.delete()
click.echo(" Done")
@transaction.atomic
def handler_update_user(usernames, kwargs):
users = models.User.objects.filter(username__in=usernames)
total = users.count()
if not total:
click.echo("No matching users")
return
final_kwargs = {}
supported_fields = [
"is_active",
"permission_moderation",
"permission_library",
"permission_settings",
"is_staff",
"is_superuser",
"upload_quota",
"password",
]
for field in supported_fields:
try:
value = kwargs[field]
except KeyError:
continue
final_kwargs[field] = value
click.echo(
"Updating {} on {} matching users…".format(
", ".join(final_kwargs.keys()), total
)
)
if "password" in final_kwargs:
new_password = final_kwargs.pop("password")
for user in users:
user.set_password(new_password)
models.User.objects.bulk_update(users, ["password"])
if final_kwargs:
users.update(**final_kwargs)
click.echo("Done!")
@base.cli.group()
def users():
"""Manage users"""
pass
@users.command()
@click.option("--username", "-u", prompt=True, required=True)
@click.option(
"-p",
"--password",
prompt="Password (leave empty to have a random one generated)",
hide_input=True,
envvar="FUNKWHALE_CLI_USER_PASSWORD",
default="",
help="If empty, a random password will be generated and displayed in console output",
)
@click.option(
"-e",
"--email",
prompt=True,
help="Email address to associate with the account",
required=True,
)
@click.option(
"-q",
"--upload-quota",
help="Upload quota (leave empty to use default pod quota)",
required=False,
default=None,
type=click.INT,
)
@click.option(
"--superuser/--no-superuser", default=False,
)
@click.option(
"--staff/--no-staff", default=False,
)
@click.option(
"--permission", multiple=True,
)
def create(username, password, email, superuser, staff, permission, upload_quota):
"""Create a new user"""
generated_password = None
if password == "":
generated_password = models.User.objects.make_random_password()
user = handler_create_user(
username=username,
password=password or generated_password,
email=email,
is_superuser=superuser,
is_staff=staff,
permissions=permission,
upload_quota=upload_quota,
)
click.echo("User {} created!".format(user.username))
if generated_password:
click.echo(" Generated password: {}".format(generated_password))
@base.delete_command(group=users, id_var="username")
@click.argument("username", nargs=-1)
@click.option(
"--hard/--no-hard",
default=False,
help="Purge all user-related info (allow recreating a user with the same username)",
)
def delete(username, hard):
"""Delete given users"""
handler_delete_user(usernames=username, soft=not hard)
@base.update_command(group=users, id_var="username")
@click.argument("username", nargs=-1)
@click.option(
"--active/--inactive",
help="Mark as active or inactive (inactive users cannot login or use the service)",
default=None,
)
@click.option("--superuser/--no-superuser", default=None)
@click.option("--staff/--no-staff", default=None)
@click.option("--permission-library/--no-permission-library", default=None)
@click.option("--permission-moderation/--no-permission-moderation", default=None)
@click.option("--permission-settings/--no-permission-settings", default=None)
@click.option("--password", default=None, envvar="FUNKWHALE_CLI_USER_UPDATE_PASSWORD")
@click.option(
"-q", "--upload-quota", type=click.INT,
)
def update(username, **kwargs):
"""Update attributes for given users"""
field_mapping = {
"active": "is_active",
"superuser": "is_superuser",
"staff": "is_staff",
}
final_kwargs = {}
for cli_field, value in kwargs.items():
if value is None:
continue
model_field = (
field_mapping[cli_field] if cli_field in field_mapping else cli_field
)
final_kwargs[model_field] = value
if not final_kwargs:
raise click.BadArgumentUsage("You need to update at least one attribute")
handler_update_user(usernames=username, kwargs=final_kwargs)
import logging
logger = logging.getLogger("funkwhale_api.cli")
......@@ -17,4 +17,11 @@ if __name__ == "__main__":
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
if len(sys.argv) > 1 and sys.argv[1] in ["fw", "funkwhale"]:
# trigger our own click-based cli
from funkwhale_api.cli import main
sys.argv = sys.argv[1:]
main.invoke()
else:
execute_from_command_line(sys.argv)
......@@ -73,3 +73,5 @@ django-storages==1.7.1
boto3<3
unicode-slugify
django-cacheops==4.2
click>=7,<8
import pytest
from click.testing import CliRunner
from funkwhale_api.cli import main
from funkwhale_api.cli import users
@pytest.mark.parametrize(
"cmd, args, handlers",
[
(
("users", "create"),
(
"--username",
"testuser",
"--password",
"testpassword",
"--email",
"test@hello.com",
"--upload-quota",
"35",
"--permission",
"library",
"--permission",
"moderation",
"--staff",
"--superuser",
),
[
(
users,
"handler_create_user",
{
"username": "testuser",
"password": "testpassword",
"email": "test@hello.com",
"upload_quota": 35,
"permissions": ("library", "moderation"),
"is_staff": True,
"is_superuser": True,
},
)
],
),
(
("users", "rm"),
("testuser1", "testuser2", "--no-input"),
[
(
users,
"handler_delete_user",
{"usernames": ("testuser1", "testuser2"), "soft": True},
)
],
),
(
("users", "rm"),
("testuser1", "testuser2", "--no-input", "--hard",),
[
(
users,
"handler_delete_user",
{"usernames": ("testuser1", "testuser2"), "soft": False},
)
],
),
(
("users", "set"),
(
"testuser1",
"testuser2",
"--no-input",
"--inactive",
"--upload-quota",
"35",
"--no-staff",
"--superuser",
"--permission-library",
"--no-permission-moderation",
"--no-permission-settings",
"--password",
"newpassword",
),
[
(
users,
"handler_update_user",
{
"usernames": ("testuser1", "testuser2"),
"kwargs": {
"is_active": False,
"upload_quota": 35,
"is_staff": False,
"is_superuser": True,
"permission_library": True,
"permission_moderation": False,
"permission_settings": False,
"password": "newpassword",
},
},
)
],
),
],
)
def test_cli(cmd, args, handlers, mocker):
patched_handlers = {}
for module, path, _ in handlers:
patched_handlers[(module, path)] = mocker.spy(module, path)
runner = CliRunner()
result = runner.invoke(main.base.cli, cmd + args)
assert result.exit_code == 0, result.output
for module, path, expected_call in handlers:
patched_handlers[(module, path)].assert_called_once_with(**expected_call)
import pytest
from funkwhale_api.cli import users
def test_user_create_handler(factories, mocker, now):
kwargs = {
"username": "helloworld",
"password": "securepassword",
"is_superuser": False,
"is_staff": True,
"email": "hello@world.email",
"upload_quota": 35,
"permissions": ["moderation"],
}
set_password = mocker.spy(users.models.User, "set_password")
create_actor = mocker.spy(users.models, "create_actor")
user = users.handler_create_user(**kwargs)
assert user.username == kwargs["username"]
assert user.is_superuser == kwargs["is_superuser"]
assert user.is_staff == kwargs["is_staff"]
assert user.date_joined >= now
assert user.upload_quota == kwargs["upload_quota"]
set_password.assert_called_once_with(user, kwargs["password"])
create_actor.assert_called_once_with(user)
expected_permissions = {
p: p in kwargs["permissions"] for p in users.models.PERMISSIONS
}
assert user.all_permissions == expected_permissions
def test_user_delete_handler_soft(factories, mocker, now):
user1 = factories["federation.Actor"](local=True).user
actor1 = user1.actor
user2 = factories["federation.Actor"](local=True).user
actor2 = user2.actor
user3 = factories["federation.Actor"](local=True).user
delete_account = mocker.spy(users.tasks, "delete_account")
users.handler_delete_user([user1.username, user2.username, "unknown"])
assert delete_account.call_count == 2
delete_account.assert_any_call(user_id=user1.pk)
with pytest.raises(user1.DoesNotExist):
user1.refresh_from_db()
delete_account.assert_any_call(user_id=user2.pk)
with pytest.raises(user2.DoesNotExist):
user2.refresh_from_db()
# soft delete, actor shouldn't be deleted
actor1.refresh_from_db()
actor2.refresh_from_db()
# not deleted
user3.refresh_from_db()
def test_user_delete_handler_hard(factories, mocker, now):
user1 = factories["federation.Actor"](local=True).user
actor1 = user1.actor
user2 = factories["federation.Actor"](local=True).user
actor2 = user2.actor
user3 = factories["federation.Actor"](local=True).user
delete_account = mocker.spy(users.tasks, "delete_account")
users.handler_delete_user([user1.username, user2.username, "unknown"], soft=False)
assert delete_account.call_count == 2
delete_account.assert_any_call(user_id=user1.pk)
with pytest.raises(user1.DoesNotExist):
user1.refresh_from_db()
delete_account.assert_any_call(user_id=user2.pk)
with pytest.raises(user2.DoesNotExist):
user2.refresh_from_db()
# hard delete, actors are deleted as well
with pytest.raises(actor1.DoesNotExist):
actor1.refresh_from_db()
with pytest.raises(actor2.DoesNotExist):
actor2.refresh_from_db()
# not deleted
user3.refresh_from_db()
@pytest.mark.parametrize(
"params, expected",
[
({"is_active": False}, {"is_active": False}),
(
{"is_staff": True, "is_superuser": True},
{"is_staff": True, "is_superuser": True},
),
({"upload_quota": 35}, {"upload_quota": 35}),
(
{
"permission_library": True,
"permission_moderation": True,
"permission_settings": True,
},
{
"all_permissions": {
"library": True,
"moderation": True,
"settings": True,
}
},
),
],
)
def test_user_update_handler(params, expected, factories):
user1 = factories["federation.Actor"](local=True).user
user2 = factories["federation.Actor"](local=True).user
user3 = factories["federation.Actor"](local=True).user
def get_field_values(user):
return {f: getattr(user, f) for f, v in expected.items()}
unchanged = get_field_values(user3)
users.handler_update_user([user1.username, user2.username, "unknown"], params)
user1.refresh_from_db()
user2.refresh_from_db()
user3.refresh_from_db()
assert get_field_values(user1) == expected
assert get_field_values(user2) == expected
assert get_field_values(user3) == unchanged
def test_user_update_handler_password(factories, mocker):
user = factories["federation.Actor"](local=True).user
current_password = user.password
set_password = mocker.spy(users.models.User, "set_password")