Skip to content
Snippets Groups Projects
Verified Commit 00853243 authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Splitted big CLI module in smaller chunks

parent 9cf45677
No related branches found
No related tags found
No related merge requests found
Pipeline #4589 failed
Showing with 1311 additions and 24 deletions
import aiohttp
from . import exceptions
from . import logs
from . import schemas
from . import settings
......
from . import auth
from . import albums
from . import artists
from . import favorites
from . import libraries
from . import playlists
from . import tracks
from . import uploads
from . import users
from .base import cli
__all__ = [
"auth",
"albums",
"artists",
"favorites",
"libraries",
"playlists",
"tracks",
"uploads",
"users",
"cli",
]
import click
from . import base
@base.cli.group()
@click.pass_context
def albums(ctx):
pass
albums_ls = base.get_ls_command(
albums,
"api/v1/albums/",
output_conf={
"labels": ["ID", "Title", "Artist", "Tracks", "Created"],
"type": "ALBUM",
"id_field": "ID",
},
)
import click
from . import base
@base.cli.group()
@click.pass_context
def artists(ctx):
pass
artists_ls = base.get_ls_command(
artists,
"api/v1/artists/",
output_conf={
"labels": ["ID", "Name", "Albums", "Tracks", "Created"],
"type": "ARTIST",
"id_field": "ID",
},
)
import click
import keyring
# importing the backends explicitely is required for PyInstaller to work
import keyring.backends.kwallet
import keyring.backends.Windows
import keyring.backends.OS_X
import keyring.backends.SecretService
import keyring.backends.chainer
from . import base
from .. import api
class lazy_credential:
"""
A proxy object to request access to the proxy object at the later possible point,
cf #4
"""
def __init__(self, *args):
self.args = args
self._cached_value = None
@property
def value(self):
if self._cached_value:
return self._cached_value
try:
v = keyring.get_password(*self.args)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
self._cached_value = v
return v
def __str__(self):
return str(self.value)
def __eq__(self, other):
return self.value == other
def __repr__(self):
return str(self.value)
def __bool__(self):
return bool(self.value)
def init_keyring():
# small hack to fix some weird issues with pyinstaller and keyring
# there seems to be a cache issue somewhere
del keyring.backend.get_all_keyring.__wrapped__.always_returns
keyring.core.init_backend()
# /end of hack
@base.cli.command()
@click.option("-u", "--username", envvar="FUNKWHALE_USERNAME", prompt=True)
@click.option(
"-p", "--password", envvar="FUNKWHALE_PASSWORD", prompt=True, hide_input=True
)
@click.pass_context
@base.async_command
async def login(ctx, username, password):
async with api.get_session() as session:
token = await api.get_jwt_token(
session, ctx.obj["SERVER_URL"], username=username, password=password
)
try:
keyring.set_password(ctx.obj["SERVER_URL"], "_", token)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
click.echo("Login successfull!")
@base.cli.command()
@click.pass_context
@base.async_command
async def logout(ctx):
keyring.delete_password(ctx.obj["SERVER_URL"], "_")
click.echo("Logout successfull!")
......@@ -2,30 +2,14 @@ import asyncio
import aiohttp
import click
import click_log
import collections
import datetime
import dotenv
import functools
import keyring
import ssl
import sys
import urllib.parse
# importing the backends explicitely is required for PyInstaller to work
import keyring.backends.kwallet
import keyring.backends.Windows
import keyring.backends.OS_X
import keyring.backends.SecretService
import keyring.backends.chainer
import logging
import math
import urllib.parse
import json
import os
import pathvalidate
import pathlib
import urllib.parse
import tqdm
from funkwhale_cli import api
from funkwhale_cli import config
......@@ -170,55 +154,15 @@ RAW_DECORATOR = click.option(
)
class lazy_credential:
"""
A proxy object to request access to the proxy object at the later possible point,
cf #4
"""
def __init__(self, *args):
self.args = args
self._cached_value = None
@property
def value(self):
if self._cached_value:
return self._cached_value
try:
v = keyring.get_password(*self.args)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
self._cached_value = v
return v
def __str__(self):
return str(self.value)
def __eq__(self, other):
return self.value == other
def __repr__(self):
return str(self.value)
def __bool__(self):
return bool(self.value)
def set_server(ctx, url, token, use_auth=True):
from . import auth
ctx.ensure_object(dict)
ctx.obj["SERVER_URL"] = url
parsed = urllib.parse.urlparse(url)
ctx.obj["SERVER_NETLOC"] = parsed.netloc
ctx.obj["SERVER_PROTOCOL"] = parsed.scheme
token = (token or lazy_credential(url, "_")) if use_auth else None
token = (token or auth.lazy_credential(url, "_")) if use_auth else None
ctx.obj["remote"] = api.get_api(
domain=ctx.obj["SERVER_NETLOC"],
protocol=ctx.obj["SERVER_PROTOCOL"],
......@@ -255,128 +199,25 @@ def set_server(ctx, url, token, use_auth=True):
@click_log.simple_verbosity_option(logs.logger, expose_value=True)
@click.pass_context
def cli(ctx, env_file, url, verbosity, token, quiet, no_login):
# small hack to fix some weird issues with pyinstaller and keyring
# there seems to be a cache issue somewhere
del keyring.backend.get_all_keyring.__wrapped__.always_returns
keyring.core.init_backend()
# /end of hack
from . import auth
auth.init_keyring()
ctx.ensure_object(dict)
logs.logger.disabled = quiet
set_server(ctx, url, token, use_auth=not no_login)
@cli.command()
@click.option("-u", "--username", envvar="FUNKWHALE_USERNAME", prompt=True)
@click.option(
"-p", "--password", envvar="FUNKWHALE_PASSWORD", prompt=True, hide_input=True
)
@click.pass_context
@async_command
async def login(ctx, username, password):
async with api.get_session() as session:
token = await api.get_jwt_token(
session, ctx.obj["SERVER_URL"], username=username, password=password
)
try:
keyring.set_password(ctx.obj["SERVER_URL"], "_", token)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
click.echo("Login successfull!")
@cli.command()
@click.pass_context
@async_command
async def logout(ctx):
keyring.delete_password(ctx.obj["SERVER_URL"], "_")
click.echo("Logout successfull!")
@cli.group()
@click.pass_context
def server(ctx, url):
pass
@cli.group()
@click.pass_context
def server(ctx):
pass
@server.command()
@RAW_DECORATOR
@click.pass_context
@async_command
async def info(ctx, raw):
async with api.get_session() as session:
nodeinfo = await api.fetch_nodeinfo(
session,
domain=ctx.obj["SERVER_NETLOC"],
protocol=ctx.obj["SERVER_PROTOCOL"],
)
if raw:
click.echo(json.dumps(nodeinfo, sort_keys=True, indent=4))
return
click.echo("\n")
click.echo("General")
click.echo("-------")
click.echo("Url: {}".format(ctx.obj["SERVER_URL"]))
click.echo("Name: {}".format(nodeinfo["metadata"]["nodeName"]))
click.echo(
"Short description: {}".format(nodeinfo["metadata"]["shortDescription"])
)
click.echo("\n")
click.echo("Software")
click.echo("----------")
click.echo("Software name: {}".format(nodeinfo["software"]["name"]))
click.echo("Version: {}".format(nodeinfo["software"]["version"]))
click.echo("\n")
click.echo("Configuration")
click.echo("---------------")
click.echo(
"Registrations: {}".format(
"open" if nodeinfo["openRegistrations"] else "closed"
)
)
@cli.group()
@click.pass_context
def libraries(ctx):
"""
Manage libraries
"""
def get_url_param(url, name):
parsed = urllib.parse.urlparse(url)
v = urllib.parse.parse_qs(parsed.query).get(name)
if v:
return v[0]
return None
def get_pagination_data(payload):
data = {"next_page": None, "page_size": None}
if payload.get("next"):
next_page = get_url_param(payload["next"], "page")
next_page = utils.get_url_param(payload["next"], "page")
data["next_page"] = int(next_page)
data["total_pages"] = math.ceil(payload["count"] / len(payload["results"]))
data["current_page"] = int(next_page) - 1
data["page_size"] = len(payload["results"])
if payload.get("previous"):
previous_page = get_url_param(payload["previous"], "page") or 0
previous_page = utils.get_url_param(payload["previous"], "page") or 0
data.setdefault("current_page", int(previous_page) + 1)
data.setdefault("total_pages", data["current_page"])
if (
......@@ -434,7 +275,9 @@ def get_ls_command(
click.option("--filter", "-f", multiple=True) if filter else noop_decorator
)
owned_decorator = (
click.option("--owned", is_flag=True, default=False) if owned_conf else noop_decorator
click.option("--owned", is_flag=True, default=False)
if owned_conf
else noop_decorator
)
@id_decorator
......@@ -501,7 +344,9 @@ def get_ls_command(
params[k] = v[0]
if owned_conf and owned:
user_info = await get_user_info(ctx)
params[owned_conf['param']] = utils.recursive_getattr(user_info, owned_conf['field'])
params[owned_conf["param"]] = utils.recursive_getattr(
user_info, owned_conf["field"]
)
else:
params = {}
......@@ -615,9 +460,9 @@ def get_delete_command(
group,
url_template,
confirm="Do you want to delete {} objects? This action is irreversible.",
doc='Delect the given objects',
doc="Delect the given objects",
name="rm",
id_metavar='ID'
id_metavar="ID",
):
@click.argument("id", nargs=-1, metavar=id_metavar)
@RAW_DECORATOR
......@@ -643,563 +488,11 @@ def get_delete_command(
return group.command(name)(delete)
libraries_ls = get_ls_command(
libraries,
"api/v1/libraries/",
output_conf={
"labels": ["UUID", "Name", "Visibility", "Uploads"],
"type": "LIBRARY",
},
)
libraries_rm = get_delete_command(libraries, "api/v1/libraries/{}/")
@libraries.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@async_command
async def libraries_create(ctx, raw, name, visibility):
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/libraries/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Library created:")
click.echo(
output.table([payload], ["UUID", "Name", "Visibility"], type="LIBRARY")
)
@cli.group()
@click.pass_context
def artists(ctx):
pass
artists_ls = get_ls_command(
artists,
"api/v1/artists/",
output_conf={
"labels": ["ID", "Name", "Albums", "Tracks", "Created"],
"type": "ARTIST",
"id_field": "ID",
},
)
@cli.group()
@click.pass_context
def albums(ctx):
pass
albums_ls = get_ls_command(
albums,
"api/v1/albums/",
output_conf={
"labels": ["ID", "Title", "Artist", "Tracks", "Created"],
"type": "ALBUM",
"id_field": "ID",
},
)
@cli.group()
@click.pass_context
def tracks(ctx):
pass
tracks_ls = get_ls_command(
tracks,
"api/v1/tracks/",
output_conf={
"labels": ["ID", "Title", "Artist", "Album", "Disc", "Position"],
"type": "TRACK",
"id_field": "ID",
},
)
async def get_track_download_url(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
try:
download_url = payload["uploads"][0]["listen_url"]
except IndexError:
if remote.token:
raise click.ClickException("This file is not available for download")
else:
raise click.ClickException(
"This file is not available for download, try to login first"
)
if download_url.startswith("/"):
download_url = remote.base_url[:-1] + download_url
if format:
download_url = add_url_params(download_url, {"to": format})
else:
format = payload["uploads"][0]["extension"]
return download_url, format, payload
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = urllib.parse.unquote(url)
# Extracting url info
parsed_url = urllib.parse.urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(urllib.parse.parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{
k: json.dumps(v)
for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))
}
)
# Converting URL argument to proper query string
encoded_get_args = urllib.parse.urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = urllib.parse.ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_get_args,
parsed_url.fragment,
).geturl()
return new_url
def sanitize_recursive(value):
if isinstance(value, dict):
return {k: sanitize_recursive(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_recursive(v) for v in value]
else:
return pathvalidate.sanitize_filepath(str(value))
def flatten(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
@tracks.command("download")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option("-o", "--overwrite", is_flag=True, default=False)
@click.option("-s", "--skip-existing", is_flag=True, default=False)
@click.option("-i", "--ignore-errors", multiple=True, type=int)
@click.option(
"-t",
"--template",
default="{artist} - {album} - {title}.{extension}",
envvar="FUNKWHALE_DOWNLOAD_PATH_TEMPLATE",
)
@click.pass_context
@async_command
async def track_download(
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
):
async with ctx.obj["remote"]:
progressbar = tqdm.tqdm(id, unit="Files")
for i in progressbar:
download_url, format, track_data = await get_track_download_url(
i, ctx.obj["remote"], format=format
)
logs.logger.info("Downloading from {}".format(download_url))
filename_params = flatten(track_data)
filename_params["album"] = filename_params["album_title"]
filename_params["artist"] = filename_params["artist_name"]
filename_params["extension"] = format
filename_params["year"] = (
filename_params["album_release_date"][:4]
if filename_params["album_release_date"]
else None
)
filename_params = {
k: sanitize_recursive(v) for k, v in filename_params.items()
}
if directory:
filename = template.format(**filename_params)
full_path = os.path.join(directory, filename)
existing = os.path.exists(full_path)
if skip_existing and existing:
logs.logger.info(
"'{}' already exists on disk, skipping download".format(
full_path
)
)
continue
elif not overwrite and existing:
raise click.ClickException(
"'{}' already exists on disk. Relaunch this command with --overwrite if you want to replace it".format(
full_path
)
)
async with ctx.obj["remote"].request(
"get", download_url, timeout=0
) as response:
try:
response.raise_for_status()
except aiohttp.ClientResponseError as e:
if response.status in ignore_errors:
logs.logger.warning(
"Remote answered with {} for url {}, skipping".format(
response.status, download_url
)
)
continue
else:
raise click.ClickException(
"Remote answered with {} for url {}, exiting".format(
response.status, download_url
)
)
if directory:
final_directory = os.path.dirname(full_path)
pathlib.Path(final_directory).mkdir(parents=True, exist_ok=True)
logs.logger.info("Downloading to {}".format(full_path))
out = open(full_path, "wb")
else:
out = click.get_binary_stream("stdout")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out.write(chunk)
logs.logger.info("Download complete")
@cli.group()
@click.pass_context
def uploads(ctx):
pass
uploads_ls = get_ls_command( # noqa
uploads,
"api/v1/uploads/",
output_conf={
"labels": ["UUID", "Track", "Artist", "Import status", "Size", "Mimetype"],
"type": "UPLOAD",
},
)
def track_read(file_obj, name, progress):
read = file_obj.read
def patched_read(size):
content = read(size)
progress.update(len(content))
progress.set_postfix(file=name[-30:], refresh=False)
return content
setattr(file_obj, "read", patched_read)
async def upload(path, size, remote, ref, library_id, semaphore, global_progress):
async with semaphore:
filename = os.path.basename(path)
data = {
"library": library_id,
"import_reference": ref,
"source": "upload://{}".format(filename),
"audio_file": open(path, "rb"),
}
track_read(data["audio_file"], filename, global_progress)
response = await remote.request("post", "api/v1/uploads/", data=data, timeout=0)
response.raise_for_status()
return response
@uploads.command("create")
@click.argument("library_id")
@click.argument("paths", nargs=-1)
@click.option("-r", "--ref", default=None)
@click.option("-p", "--parallel", type=click.INT, default=1)
@click.pass_context
@async_command
async def uploads_create(ctx, library_id, paths, ref, parallel):
logs.logger.info("Uploading {} files…".format(len(paths)))
paths = sorted(set(paths))
if not paths:
return
ref = ref or "funkwhale-cli-import-{}".format(datetime.datetime.now().isoformat())
sizes = {path: os.path.getsize(path) for path in paths}
async with ctx.obj["remote"]:
logs.logger.info("Checking library {} existence…".format(library_id))
library_data = await ctx.obj["remote"].request(
"get", "api/v1/libraries/{}/".format(library_id)
)
library_data.raise_for_status()
sem = asyncio.Semaphore(parallel)
progressbar = tqdm.tqdm(
total=sum(sizes.values()), unit="B", unit_scale=True, unit_divisor=1024
)
tasks = [
upload(
path=path,
ref=ref,
size=sizes[path],
global_progress=progressbar,
remote=ctx.obj["remote"],
library_id=library_id,
semaphore=sem,
)
for path in paths
]
await asyncio.gather(*tasks)
logs.logger.info("Upload complete")
@cli.group()
@click.pass_context
def favorites(ctx):
pass
@favorites.group("tracks")
@click.pass_context
def favorites_tracks(ctx):
pass
@favorites_tracks.command("create")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@async_command
async def favorites_tracks_create(ctx, id):
click.echo("Adding {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"post", "api/v1/favorites/tracks/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} added to favorites".format(i))
@favorites_tracks.command("rm")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@async_command
async def favorites_tracks_rm(ctx, id):
click.echo("Removing {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"delete", "api/v1/favorites/tracks/remove/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} removed from favorites".format(i))
favorites_tracks_ls = get_ls_command( # noqa
favorites_tracks,
"api/v1/favorites/tracks/",
output_conf={
"labels": ["Track ID", "Track", "Artist", "Favorite Date"],
"type": "TRACK_FAVORITE",
},
)
@cli.group()
@click.pass_context
def playlists(ctx):
"""
Manage playlists
"""
playlists_ls = get_ls_command(
playlists,
"api/v1/playlists/",
doc="List available playlists",
owned_conf={'param': 'user', 'field': 'id'},
output_conf={
"labels": [
"ID",
"Name",
"Visibility",
"Tracks Count",
"User",
"Created",
"Modified",
],
"type": "PLAYLIST",
},
)
playlists_rm = get_delete_command(
playlists, "api/v1/playlists/{}/", id_metavar="PLAYLIST_ID", doc='Delete the given playlists'
)
@playlists.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@async_command
async def playlists_create(ctx, raw, name, visibility):
"""
Create a new playlist
"""
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/playlists/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Playlist created:")
click.echo(
output.table(
[payload], ["ID", "Name", "Visibility", "Tracks Count"], type="PLAYLIST"
)
)
@playlists.command("tracks-add")
@click.argument("id", metavar="PLAYLIST_ID")
@click.argument("track", nargs=-1, metavar="[TRACK_ID]…")
@click.option(
"--no-duplicates",
is_flag=True,
default=False,
help="Prevent insertion of tracks that already exist in the playlist. An error will be raised in this case.",
)
@click.pass_context
@async_command
async def playlists_tracks_add(ctx, id, track, no_duplicates):
"""
Insert one or more tracks in a playlist
"""
if not track:
return click.echo("No track id provided")
async with ctx.obj["remote"]:
async with ctx.obj["remote"].request(
"post",
"api/v1/playlists/{}/".format(id),
data={"tracks": track, "allow_duplicates": not no_duplicates},
) as response:
response.raise_for_status()
playlists_tracks = get_ls_command(
playlists,
"api/v1/playlists/{}/tracks/",
name="tracks",
with_id=True,
pagination=False,
ordering=False,
filter=False,
output_conf={
"labels": ["Position", "ID", "Title", "Artist", "Album", "Created"],
"id_field": "ID",
"type": "PLAYLIST_TRACK",
},
id_metavar="PLAYLIST_ID",
doc="List the tracks included in a playlist"
)
@cli.group()
@click.pass_context
def users(ctx):
pass
users_me = get_show_command(
users,
"api/v1/users/users/{}/",
output_conf={
"labels": [
"ID",
"Username",
"Name",
"Email",
"Federation ID",
"Joined",
"Visibility",
"Staff",
"Admin",
"Permissions",
],
"type": "USER",
},
force_id="me",
name="me",
)
async def get_user_info(ctx):
async with ctx.obj["remote"].request(
"get", "api/v1/users/users/me/",
) as result:
async with ctx.obj["remote"].request("get", "api/v1/users/users/me/") as result:
result.raise_for_status()
return await result.json()
if __name__ == "__main__":
cli()
import click
from . import base
@base.cli.group()
@click.pass_context
def favorites(ctx):
pass
@favorites.group("tracks")
@click.pass_context
def favorites_tracks(ctx):
pass
@favorites_tracks.command("create")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_create(ctx, id):
click.echo("Adding {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"post", "api/v1/favorites/tracks/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} added to favorites".format(i))
@favorites_tracks.command("rm")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_rm(ctx, id):
click.echo("Removing {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"delete", "api/v1/favorites/tracks/remove/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} removed from favorites".format(i))
favorites_tracks_ls = base.get_ls_command( # noqa
favorites_tracks,
"api/v1/favorites/tracks/",
output_conf={
"labels": ["Track ID", "Track", "Artist", "Favorite Date"],
"type": "TRACK_FAVORITE",
},
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def libraries(ctx):
"""
Manage libraries
"""
libraries_ls = base.get_ls_command(
libraries,
"api/v1/libraries/",
output_conf={
"labels": ["UUID", "Name", "Visibility", "Uploads"],
"type": "LIBRARY",
},
)
libraries_rm = base.get_delete_command(libraries, "api/v1/libraries/{}/")
@libraries.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def libraries_create(ctx, raw, name, visibility):
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/libraries/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Library created:")
click.echo(
output.table([payload], ["UUID", "Name", "Visibility"], type="LIBRARY")
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def playlists(ctx):
"""
Manage playlists
"""
playlists_ls = base.get_ls_command(
playlists,
"api/v1/playlists/",
doc="List available playlists",
owned_conf={"param": "user", "field": "id"},
output_conf={
"labels": [
"ID",
"Name",
"Visibility",
"Tracks Count",
"User",
"Created",
"Modified",
],
"type": "PLAYLIST",
},
)
playlists_rm = base.get_delete_command(
playlists,
"api/v1/playlists/{}/",
id_metavar="PLAYLIST_ID",
doc="Delete the given playlists",
)
@playlists.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def playlists_create(ctx, raw, name, visibility):
"""
Create a new playlist
"""
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/playlists/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Playlist created:")
click.echo(
output.table(
[payload], ["ID", "Name", "Visibility", "Tracks Count"], type="PLAYLIST"
)
)
@playlists.command("tracks-add")
@click.argument("id", metavar="PLAYLIST_ID")
@click.argument("track", nargs=-1, metavar="[TRACK_ID]…")
@click.option(
"--no-duplicates",
is_flag=True,
default=False,
help="Prevent insertion of tracks that already exist in the playlist. An error will be raised in this case.",
)
@click.pass_context
@base.async_command
async def playlists_tracks_add(ctx, id, track, no_duplicates):
"""
Insert one or more tracks in a playlist
"""
if not track:
return click.echo("No track id provided")
async with ctx.obj["remote"]:
async with ctx.obj["remote"].request(
"post",
"api/v1/playlists/{}/".format(id),
data={"tracks": track, "allow_duplicates": not no_duplicates},
) as response:
response.raise_for_status()
playlists_tracks = base.get_ls_command(
playlists,
"api/v1/playlists/{}/tracks/",
name="tracks",
with_id=True,
pagination=False,
ordering=False,
filter=False,
output_conf={
"labels": ["Position", "ID", "Title", "Artist", "Album", "Created"],
"id_field": "ID",
"type": "PLAYLIST_TRACK",
},
id_metavar="PLAYLIST_ID",
doc="List the tracks included in a playlist",
)
import json
import click
from . import base
from .. import api
@base.cli.group()
@click.pass_context
def server(ctx):
pass
@server.command()
@base.RAW_DECORATOR
@click.pass_context
@base.async_command
async def info(ctx, raw):
async with api.get_session() as session:
nodeinfo = await api.fetch_nodeinfo(
session,
domain=ctx.obj["SERVER_NETLOC"],
protocol=ctx.obj["SERVER_PROTOCOL"],
)
if raw:
click.echo(json.dumps(nodeinfo, sort_keys=True, indent=4))
return
click.echo("\n")
click.echo("General")
click.echo("-------")
click.echo("Url: {}".format(ctx.obj["SERVER_URL"]))
click.echo("Name: {}".format(nodeinfo["metadata"]["nodeName"]))
click.echo(
"Short description: {}".format(nodeinfo["metadata"]["shortDescription"])
)
click.echo("\n")
click.echo("Software")
click.echo("----------")
click.echo("Software name: {}".format(nodeinfo["software"]["name"]))
click.echo("Version: {}".format(nodeinfo["software"]["version"]))
click.echo("\n")
click.echo("Configuration")
click.echo("---------------")
click.echo(
"Registrations: {}".format(
"open" if nodeinfo["openRegistrations"] else "closed"
)
)
import os
import pathlib
import aiohttp
import click
import tqdm
from . import base
from .. import logs
from .. import utils
@base.cli.group()
@click.pass_context
def tracks(ctx):
pass
tracks_ls = base.get_ls_command(
tracks,
"api/v1/tracks/",
output_conf={
"labels": ["ID", "Title", "Artist", "Album", "Disc", "Position"],
"type": "TRACK",
"id_field": "ID",
},
)
async def get_track_download_url(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
try:
download_url = payload["uploads"][0]["listen_url"]
except IndexError:
if remote.token:
raise click.ClickException("This file is not available for download")
else:
raise click.ClickException(
"This file is not available for download, try to login first"
)
if download_url.startswith("/"):
download_url = remote.base_url[:-1] + download_url
if format:
download_url = utils.add_url_params(download_url, {"to": format})
else:
format = payload["uploads"][0]["extension"]
return download_url, format, payload
@tracks.command("download")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option("-o", "--overwrite", is_flag=True, default=False)
@click.option("-s", "--skip-existing", is_flag=True, default=False)
@click.option("-i", "--ignore-errors", multiple=True, type=int)
@click.option(
"-t",
"--template",
default="{artist} - {album} - {title}.{extension}",
envvar="FUNKWHALE_DOWNLOAD_PATH_TEMPLATE",
)
@click.pass_context
@base.async_command
async def track_download(
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
):
async with ctx.obj["remote"]:
progressbar = tqdm.tqdm(id, unit="Files")
for i in progressbar:
download_url, format, track_data = await get_track_download_url(
i, ctx.obj["remote"], format=format
)
logs.logger.info("Downloading from {}".format(download_url))
filename_params = utils.flatten(track_data)
filename_params["album"] = filename_params["album_title"]
filename_params["artist"] = filename_params["artist_name"]
filename_params["extension"] = format
filename_params["year"] = (
filename_params["album_release_date"][:4]
if filename_params["album_release_date"]
else None
)
filename_params = {
k: utils.sanitize_recursive(v) for k, v in filename_params.items()
}
if directory:
filename = template.format(**filename_params)
full_path = os.path.join(directory, filename)
existing = os.path.exists(full_path)
if skip_existing and existing:
logs.logger.info(
"'{}' already exists on disk, skipping download".format(
full_path
)
)
continue
elif not overwrite and existing:
raise click.ClickException(
"'{}' already exists on disk. Relaunch this command with --overwrite if you want to replace it".format(
full_path
)
)
async with ctx.obj["remote"].request(
"get", download_url, timeout=0
) as response:
try:
response.raise_for_status()
except aiohttp.ClientResponseError as e:
if response.status in ignore_errors:
logs.logger.warning(
"Remote answered with {} for url {}, skipping".format(
response.status, download_url
)
)
continue
else:
raise click.ClickException(
"Remote answered with {} for url {}, exiting".format(
response.status, download_url
)
)
if directory:
final_directory = os.path.dirname(full_path)
pathlib.Path(final_directory).mkdir(parents=True, exist_ok=True)
logs.logger.info("Downloading to {}".format(full_path))
out = open(full_path, "wb")
else:
out = click.get_binary_stream("stdout")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out.write(chunk)
logs.logger.info("Download complete")
import datetime
import os
import asyncio
import click
import tqdm
from . import base
from .. import logs
@base.cli.group()
@click.pass_context
def uploads(ctx):
pass
uploads_ls = base.get_ls_command( # noqa
uploads,
"api/v1/uploads/",
output_conf={
"labels": ["UUID", "Track", "Artist", "Import status", "Size", "Mimetype"],
"type": "UPLOAD",
},
)
def track_read(file_obj, name, progress):
read = file_obj.read
def patched_read(size):
content = read(size)
progress.update(len(content))
progress.set_postfix(file=name[-30:], refresh=False)
return content
setattr(file_obj, "read", patched_read)
async def upload(path, size, remote, ref, library_id, semaphore, global_progress):
async with semaphore:
filename = os.path.basename(path)
data = {
"library": library_id,
"import_reference": ref,
"source": "upload://{}".format(filename),
"audio_file": open(path, "rb"),
}
track_read(data["audio_file"], filename, global_progress)
response = await remote.request("post", "api/v1/uploads/", data=data, timeout=0)
response.raise_for_status()
return response
@uploads.command("create")
@click.argument("library_id")
@click.argument("paths", nargs=-1)
@click.option("-r", "--ref", default=None)
@click.option("-p", "--parallel", type=click.INT, default=1)
@click.pass_context
@base.async_command
async def uploads_create(ctx, library_id, paths, ref, parallel):
logs.logger.info("Uploading {} files…".format(len(paths)))
paths = sorted(set(paths))
if not paths:
return
ref = ref or "funkwhale-cli-import-{}".format(datetime.datetime.now().isoformat())
sizes = {path: os.path.getsize(path) for path in paths}
async with ctx.obj["remote"]:
logs.logger.info("Checking library {} existence…".format(library_id))
library_data = await ctx.obj["remote"].request(
"get", "api/v1/libraries/{}/".format(library_id)
)
library_data.raise_for_status()
sem = asyncio.Semaphore(parallel)
progressbar = tqdm.tqdm(
total=sum(sizes.values()), unit="B", unit_scale=True, unit_divisor=1024
)
tasks = [
upload(
path=path,
ref=ref,
size=sizes[path],
global_progress=progressbar,
remote=ctx.obj["remote"],
library_id=library_id,
semaphore=sem,
)
for path in paths
]
await asyncio.gather(*tasks)
logs.logger.info("Upload complete")
import click
from . import base
@base.cli.group()
@click.pass_context
def users(ctx):
pass
users_me = base.get_show_command(
users,
"api/v1/users/users/{}/",
output_conf={
"labels": [
"ID",
"Username",
"Name",
"Email",
"Federation ID",
"Joined",
"Visibility",
"Staff",
"Admin",
"Permissions",
],
"type": "USER",
},
force_id="me",
name="me",
)
import collections
import json
import urllib.parse
import pathvalidate
def recursive_getattr(obj, key, permissive=False):
"""
Given a dictionary such as {'user': {'name': 'Bob'}} and
......@@ -17,3 +24,80 @@ def recursive_getattr(obj, key, permissive=False):
return
return v
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = urllib.parse.unquote(url)
# Extracting url info
parsed_url = urllib.parse.urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(urllib.parse.parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{
k: json.dumps(v)
for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))
}
)
# Converting URL argument to proper query string
encoded_get_args = urllib.parse.urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = urllib.parse.ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_get_args,
parsed_url.fragment,
).geturl()
return new_url
def sanitize_recursive(value):
if isinstance(value, dict):
return {k: sanitize_recursive(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_recursive(v) for v in value]
else:
return pathvalidate.sanitize_filepath(str(value))
def flatten(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def get_url_param(url, name):
parsed = urllib.parse.urlparse(url)
v = urllib.parse.parse_qs(parsed.query).get(name)
if v:
return v[0]
return None
......@@ -56,3 +56,8 @@ universal = 1
[tool:pytest]
testpaths = tests
[flake8]
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,tests/data,tests/music/conftest.py
ignore = F405,W503,E203
import aiohttp
import marshmallow
import pytest
......@@ -80,6 +79,5 @@ def test_clean_nodeinfo():
def test_clean_nodeinfo_raises_on_validation_failure():
payload = {}
with pytest.raises(marshmallow.ValidationError):
api.clean_nodeinfo({})
import pytest
import click
import keyring
from funkwhale_cli import api
from funkwhale_cli import cli
......@@ -32,7 +31,7 @@ def cli_ctx(mocker):
def test_delete_command(group, cli_ctx, session, responses):
command = cli.get_delete_command(group, "api/v1/noop/{}/")
command = cli.base.get_delete_command(group, "api/v1/noop/{}/")
id = "fake_id"
responses.delete("https://test.funkwhale/api/v1/noop/fake_id/")
command.callback(id=[id], raw=False, no_input=True, _async_reraise=True)
......@@ -61,12 +60,12 @@ def test_delete_command(group, cli_ctx, session, responses):
],
)
def test_get_pagination_data(input, output):
assert cli.get_pagination_data(input) == output
assert cli.base.get_pagination_data(input) == output
def test_lazy_credential(mocker):
get_password = mocker.patch("keyring.get_password", return_value="password")
credential = cli.lazy_credential("http://testurl", "_")
credential = cli.auth.lazy_credential("http://testurl", "_")
get_password.assert_not_called()
......@@ -82,7 +81,7 @@ def test_lazy_credential(mocker):
def test_users_me(cli_ctx, session, responses, get_requests):
command = cli.users_me
command = cli.users.users_me
url = "https://test.funkwhale/api/v1/users/users/me/"
responses.get(
url,
......@@ -116,7 +115,7 @@ def test_users_me(cli_ctx, session, responses, get_requests):
def test_libraries_create(cli_ctx, session, responses, get_requests):
command = cli.libraries_create
command = cli.libraries.libraries_create
url = "https://test.funkwhale/api/v1/libraries/"
responses.post(url)
......@@ -128,7 +127,7 @@ def test_libraries_create(cli_ctx, session, responses, get_requests):
def test_libraries_ls(cli_ctx, session, responses, get_requests):
command = cli.libraries_ls
command = cli.libraries.libraries_ls
url = "https://test.funkwhale/api/v1/libraries/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -153,7 +152,7 @@ def test_libraries_ls(cli_ctx, session, responses, get_requests):
def test_libraries_rm(cli_ctx, session, responses, get_requests):
command = cli.libraries_rm
command = cli.libraries.libraries_rm
url = "https://test.funkwhale/api/v1/libraries/"
responses.delete(url + "1/")
responses.delete(url + "42/")
......@@ -165,7 +164,7 @@ def test_libraries_rm(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_create(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_create
command = cli.favorites.favorites_tracks_create
url = "https://test.funkwhale/api/v1/favorites/tracks/"
responses.post(url, repeat=True)
......@@ -178,7 +177,7 @@ def test_favorites_tracks_create(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_ls
command = cli.favorites.favorites_tracks_ls
url = "https://test.funkwhale/api/v1/favorites/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -203,7 +202,7 @@ def test_favorites_tracks_ls(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_rm(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_rm
command = cli.favorites.favorites_tracks_rm
url = "https://test.funkwhale/api/v1/favorites/tracks/remove/"
responses.delete(url, repeat=True)
......@@ -216,7 +215,7 @@ def test_favorites_tracks_rm(cli_ctx, session, responses, get_requests):
def test_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.tracks_ls
command = cli.tracks.tracks_ls
url = "https://test.funkwhale/api/v1/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -241,7 +240,7 @@ def test_tracks_ls(cli_ctx, session, responses, get_requests):
def test_artists_ls(cli_ctx, session, responses, get_requests):
command = cli.artists_ls
command = cli.artists.artists_ls
url = "https://test.funkwhale/api/v1/artists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -266,7 +265,7 @@ def test_artists_ls(cli_ctx, session, responses, get_requests):
def test_albums_ls(cli_ctx, session, responses, get_requests):
command = cli.albums_ls
command = cli.albums.albums_ls
url = "https://test.funkwhale/api/v1/albums/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -291,7 +290,7 @@ def test_albums_ls(cli_ctx, session, responses, get_requests):
def test_playlists_create(cli_ctx, session, responses, get_requests):
command = cli.playlists_create
command = cli.playlists.playlists_create
url = "https://test.funkwhale/api/v1/playlists/"
responses.post(url)
......@@ -303,7 +302,7 @@ def test_playlists_create(cli_ctx, session, responses, get_requests):
def test_playlists_ls(cli_ctx, session, responses, get_requests):
command = cli.playlists_ls
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -331,9 +330,9 @@ def test_playlists_ls_mine(
cli_ctx, session, responses, get_requests, mocker, coroutine_mock
):
get_user_info = mocker.patch.object(
cli, "get_user_info", coroutine_mock(return_value={"id": 42})
cli.base, "get_user_info", coroutine_mock(return_value={"id": 42})
)
command = cli.playlists_ls
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello&user=42"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -360,7 +359,7 @@ def test_playlists_ls_mine(
def test_playlists_rm(cli_ctx, session, responses, get_requests):
command = cli.playlists_rm
command = cli.playlists.playlists_rm
url = "https://test.funkwhale/api/v1/playlists/"
responses.delete(url + "1/")
responses.delete(url + "42/")
......@@ -372,7 +371,7 @@ def test_playlists_rm(cli_ctx, session, responses, get_requests):
def test_playlists_tracks_add(cli_ctx, session, responses, get_requests):
command = cli.playlists_tracks_add
command = cli.playlists.playlists_tracks_add
url = "https://test.funkwhale/api/v1/playlists/66/"
responses.post(url)
......@@ -384,7 +383,7 @@ def test_playlists_tracks_add(cli_ctx, session, responses, get_requests):
def test_playlists_tracks(cli_ctx, session, responses, get_requests):
command = cli.playlists_tracks
command = cli.playlists.playlists_tracks
url = "https://test.funkwhale/api/v1/playlists/66/tracks/"
responses.get(url, payload={"results": [], "count": 0})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment