Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • renovate/aiofiles-0.x
2 results

Target

Select target project
  • funkwhale/cli
  • neodarz/cli
  • apetresc/cli
  • EorlBruder/cli
  • prplecake/cli
5 results
Select Git revision
  • master
1 result
Show changes
import click
from . import base
@base.cli.group()
@click.pass_context
def favorites(ctx):
pass
@favorites.group("tracks")
@click.pass_context
def favorites_tracks(ctx):
pass
@favorites_tracks.command("create")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_create(ctx, id):
click.echo("Adding {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"post", "api/v1/favorites/tracks/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} added to favorites".format(i))
@favorites_tracks.command("rm")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_rm(ctx, id):
click.echo("Removing {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"delete", "api/v1/favorites/tracks/remove/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} removed from favorites".format(i))
favorites_tracks_ls = base.get_ls_command( # noqa
favorites_tracks,
"api/v1/favorites/tracks/",
output_conf={
"labels": ["Track ID", "Track", "Artist", "Favorite Date"],
"type": "TRACK_FAVORITE",
},
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def libraries(ctx):
"""
Manage libraries
"""
libraries_ls = base.get_ls_command(
libraries,
"api/v1/libraries/",
output_conf={
"labels": ["UUID", "Name", "Visibility", "Uploads"],
"type": "LIBRARY",
},
)
libraries_rm = base.get_delete_command(libraries, "api/v1/libraries/{}/")
@libraries.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def libraries_create(ctx, raw, name, visibility):
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/libraries/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Library created:")
click.echo(
output.table([payload], ["UUID", "Name", "Visibility"], type="LIBRARY")
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def playlists(ctx):
"""
Manage playlists
"""
playlists_ls = base.get_ls_command(
playlists,
"api/v1/playlists/",
doc="List available playlists",
owned_conf={"param": "user", "field": "id"},
output_conf={
"labels": [
"ID",
"Name",
"Visibility",
"Tracks Count",
"User",
"Created",
"Modified",
],
"type": "PLAYLIST",
},
)
playlists_rm = base.get_delete_command(
playlists,
"api/v1/playlists/{}/",
id_metavar="PLAYLIST_ID",
doc="Delete the given playlists",
)
@playlists.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def playlists_create(ctx, raw, name, visibility):
"""
Create a new playlist
"""
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/playlists/", data={"name": name, "privacy_level": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Playlist created:")
click.echo(
output.table(
[payload], ["ID", "Name", "Visibility", "Tracks Count"], type="PLAYLIST"
)
)
@playlists.command("tracks-add")
@click.argument("id", metavar="PLAYLIST_ID")
@click.argument("track", nargs=-1, metavar="[TRACK_ID]…")
@click.option(
"--no-duplicates",
is_flag=True,
default=False,
help="Prevent insertion of tracks that already exist in the playlist. An error will be raised in this case.",
)
@click.pass_context
@base.async_command
async def playlists_tracks_add(ctx, id, track, no_duplicates):
"""
Insert one or more tracks in a playlist
"""
if not track:
return click.echo("No track id provided")
async with ctx.obj["remote"]:
async with ctx.obj["remote"].request(
"post",
"api/v1/playlists/{}/add".format(id),
data={"tracks": track, "allow_duplicates": not no_duplicates},
) as response:
response.raise_for_status()
playlists_tracks = base.get_ls_command(
playlists,
"api/v1/playlists/{}/tracks/",
name="tracks",
with_id=True,
pagination=False,
ordering=False,
filter=False,
output_conf={
"labels": ["Position", "ID", "Title", "Artist", "Album", "Created"],
"id_field": "ID",
"type": "PLAYLIST_TRACK",
},
id_metavar="PLAYLIST_ID",
doc="List the tracks included in a playlist",
)
import json
import click
from . import base
from .. import api
@base.cli.group()
@click.pass_context
def server(ctx):
pass
@server.command()
@base.RAW_DECORATOR
@click.pass_context
@base.async_command
async def info(ctx, raw):
async with api.get_session() as session:
nodeinfo = await api.fetch_nodeinfo(
session,
domain=ctx.obj["SERVER_NETLOC"],
protocol=ctx.obj["SERVER_PROTOCOL"],
)
if raw:
click.echo(json.dumps(nodeinfo, sort_keys=True, indent=4))
return
click.echo("\n")
click.echo("General")
click.echo("-------")
click.echo("Url: {}".format(ctx.obj["SERVER_URL"]))
click.echo("Name: {}".format(nodeinfo["metadata"]["nodeName"]))
click.echo(
"Short description: {}".format(nodeinfo["metadata"]["shortDescription"])
)
click.echo("\n")
click.echo("Software")
click.echo("----------")
click.echo("Software name: {}".format(nodeinfo["software"]["name"]))
click.echo("Version: {}".format(nodeinfo["software"]["version"]))
click.echo("\n")
click.echo("Configuration")
click.echo("---------------")
click.echo(
"Registrations: {}".format(
"open" if nodeinfo["openRegistrations"] else "closed"
)
)
import os
import pathlib
import aiohttp
import click
import tqdm
from . import base
from .. import logs
from .. import utils
TEMPLATE_ENV_VAR = "FUNKWHALE_DOWNLOAD_PATH_TEMPLATE"
TEMPLATE_DEFAULT = "{artist} - {album} - {title}.{extension}"
@base.cli.group()
@click.pass_context
def tracks(ctx):
pass
tracks_ls = base.get_ls_command(
tracks,
"api/v1/tracks/",
output_conf={
"labels": ["ID", "Title", "Artist", "Album", "Disc", "Position"],
"type": "TRACK",
"id_field": "ID",
},
)
async def get_track_download_url(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
try:
download_url = payload["uploads"][0]["listen_url"]
except IndexError:
if remote.token:
raise click.ClickException("This file is not available for download: " + id)
else:
raise click.ClickException(
"This file is not available for download, try to login first: " + id
)
if download_url.startswith("/"):
download_url = remote.base_url[:-1] + download_url
if format:
download_url = utils.add_url_params(download_url, {"to": format})
else:
format = payload["uploads"][0]["extension"]
return download_url, format, payload
def extract_filename_params(track_data, format):
filename_params = utils.flatten(track_data)
filename_params["album"] = filename_params.get("album_title", None)
filename_params["artist"] = filename_params.get("artist_name", None)
filename_params["extension"] = format
try:
filename_params["year"] = filename_params["album_release_date"][:4]
except (KeyError, TypeError):
filename_params["year"] = None
return {
k: utils.sanitize_recursive(v) for k, v in filename_params.items()
}
@tracks.command("download")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option("-o", "--overwrite", is_flag=True, default=False)
@click.option("-s", "--skip-existing", is_flag=True, default=False)
@click.option("-i", "--ignore-errors", multiple=True, type=int)
@click.option(
"-t",
"--template",
default=TEMPLATE_DEFAULT,
envvar=TEMPLATE_ENV_VAR,
)
@click.pass_context
@base.async_command
async def track_download(
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
):
async with ctx.obj["remote"]:
progressbar = tqdm.tqdm(id, unit="Files")
for i in progressbar:
try:
download_url, extension, track_data = await get_track_download_url(
i, ctx.obj["remote"], format=format
)
except click.ClickException as e:
logs.logger.error(e.message)
continue
logs.logger.info("Downloading from {}".format(download_url))
filename_params = extract_filename_params(track_data, extension)
if directory:
filename = template.format(**filename_params)
full_path = os.path.join(directory, filename)
existing = os.path.exists(full_path)
if skip_existing and existing:
logs.logger.info(
"'{}' already exists on disk, skipping download".format(
full_path
)
)
continue
elif not overwrite and existing:
raise click.ClickException(
"'{}' already exists on disk. Relaunch this command with --overwrite if you want to replace it".format(
full_path
)
)
async with ctx.obj["remote"].request(
"get", download_url, timeout=0
) as response:
try:
response.raise_for_status()
except aiohttp.ClientResponseError as e:
if response.status in ignore_errors:
logs.logger.warning(
"Remote answered with {} for url {}, skipping".format(
response.status, download_url
)
)
continue
else:
raise click.ClickException(
"Remote answered with {} for url {}, exiting".format(
response.status, download_url
)
)
if directory:
final_directory = os.path.dirname(full_path)
pathlib.Path(final_directory).mkdir(parents=True, exist_ok=True)
logs.logger.info("Downloading to {}".format(full_path))
out = open(full_path, "wb")
else:
out = click.get_binary_stream("stdout")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out.write(chunk)
logs.logger.info("Download complete")
async def get_track_data(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
if not format and payload["uploads"]:
format = payload["uploads"][0]["extension"]
return format, payload
@tracks.command("generate-playlist")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option(
"-t",
"--template",
default=TEMPLATE_DEFAULT,
envvar=TEMPLATE_ENV_VAR,
)
@click.option("-b", "--base-path", default="./")
@click.option("-n", "--name", default="playlist")
@click.pass_context
@base.async_command
async def track_generate_playlist(
ctx, id, format, directory, template, base_path, name
):
async with ctx.obj["remote"]:
playlist = []
for i in id:
extension, track_data = await get_track_data(
i, ctx.obj["remote"], format=format
)
filename_params = extract_filename_params(track_data, extension)
filename = base_path + template.format(**filename_params)
playlist.append(filename)
if directory:
filename = name + ".m3u8"
full_path = os.path.join(directory, filename)
logs.logger.info("Writing playlist to {}".format(full_path))
with open(full_path, "w") as out:
for track in playlist:
out.write(track + "\n")
else:
out = click.get_binary_stream("stdout")
for track in playlist:
out.write(bytes(track + "\n", "utf8"))
logs.logger.info("Export of playlist complete")
import datetime
import json
import os
import asyncio
import click
import tqdm
from . import base
from .. import logs
@base.cli.group()
@click.pass_context
def uploads(ctx):
pass
uploads_ls = base.get_ls_command( # noqa
uploads,
"api/v1/uploads/",
output_conf={
"labels": ["UUID", "Track", "Artist", "Import status", "Size", "Mimetype"],
"type": "UPLOAD",
},
)
def track_read(file_obj, name, progress):
read = file_obj.read
def patched_read(size):
content = read(size)
progress.update(len(content))
progress.set_postfix(file=name[-30:], refresh=False)
return content
setattr(file_obj, "read", patched_read)
def get_valid_fields(metadata):
data = {}
for field in ["title", "position", "tags", "description"]:
if field not in metadata:
continue
if field == "description":
data[field] = metadata[field]["text"]
else:
data[field] = metadata[field]
return data
async def upload(
path,
size,
remote,
ref,
container_type,
draft,
album,
license,
library_or_channel_id,
semaphore,
global_progress,
):
async with semaphore:
filename = os.path.basename(path)
data = {
container_type: library_or_channel_id,
"import_reference": ref,
"source": "upload://{}".format(filename),
"audio_file": open(path, "rb"),
}
if container_type == "channel":
# needed to set proper metadata in the file later on before publication
data["import_status"] = "draft"
data["import_metadata"] = json.dumps(
{"title": filename, "album": album, "license": license}
)
track_read(data["audio_file"], filename, global_progress)
response = await remote.request("post", "api/v1/uploads/", data=data, timeout=0)
upload = await base.check_status(response)
upload = await response.json()
if container_type == "channel":
metadata_response = await remote.request(
"get", "api/v1/uploads/{}/audio-file-metadata/".format(upload["uuid"])
)
metadata_response.raise_for_status()
metadata = await metadata_response.json()
new_data = {"import_metadata": upload["import_metadata"]}
new_data["import_metadata"].update(get_valid_fields(metadata))
if not draft:
new_data["import_status"] = "pending"
patch_response = await remote.request(
"patch", "api/v1/uploads/{}/".format(upload["uuid"]), json=new_data
)
patch_response.raise_for_status()
return response
@uploads.command("create")
@click.argument("library_or_channel_id")
@click.argument("paths", nargs=-1)
@click.option("-r", "--ref", default=None)
@click.option(
"-a",
"--album",
default=None,
help="Album to associate with the uploads. Only used when --channel is provided",
)
@click.option(
"-l",
"--license",
default=None,
help="License to associate with the uploads. Only used when --channel is provided",
)
@click.option(
"-c",
"--channel",
is_flag=True,
default=False,
help="Provide this flag if you're uploading to a channel",
)
@click.option(
"-d",
"--draft",
is_flag=True,
default=False,
help="Provide this flag if you want to upload in draft and publish later",
)
@click.option("-p", "--parallel", type=click.INT, default=1)
@click.pass_context
@base.async_command
async def uploads_create(
ctx, library_or_channel_id, paths, ref, parallel, draft, channel, album, license
):
logs.logger.info("Uploading {} files…".format(len(paths)))
paths = sorted(set(paths))
if not paths:
return
ref = ref or "funkwhale-cli-import-{}".format(datetime.datetime.now().isoformat())
sizes = {path: os.path.getsize(path) for path in paths}
async with ctx.obj["remote"]:
if channel:
logs.logger.info(
"Checking channel {} existence…".format(library_or_channel_id)
)
channel_data = await ctx.obj["remote"].request(
"get", "api/v1/channels/{}/".format(library_or_channel_id)
)
channel_data.raise_for_status()
else:
logs.logger.info(
"Checking library {} existence…".format(library_or_channel_id)
)
library_data = await ctx.obj["remote"].request(
"get", "api/v1/libraries/{}/".format(library_or_channel_id)
)
library_data.raise_for_status()
sem = asyncio.Semaphore(parallel)
progressbar = tqdm.tqdm(
total=sum(sizes.values()), unit="B", unit_scale=True, unit_divisor=1024
)
tasks = [
upload(
path=path,
ref=ref,
size=sizes[path],
draft=draft,
album=album,
license=license,
global_progress=progressbar,
remote=ctx.obj["remote"],
library_or_channel_id=library_or_channel_id,
container_type="channel" if channel else "library",
semaphore=sem,
)
for path in paths
]
await asyncio.gather(*tasks)
logs.logger.info("Upload complete")
uploads_rm = base.get_delete_command(uploads, "api/v1/uploads/{}/")
import click
from . import base
@base.cli.group()
@click.pass_context
def users(ctx):
pass
users_me = base.get_show_command(
users,
"api/v1/users/users/{}/",
output_conf={
"labels": [
"ID",
"Username",
"Name",
"Email",
"Federation ID",
"Joined",
"Visibility",
"Staff",
"Admin",
"Permissions",
],
"type": "USER",
},
force_id="me",
name="me",
)
import tabulate
def comma_separated(it):
return ", ".join(it)
FIELDS = {
"ARTIST": {
"ID": {"field": "id", "truncate": 0},
......@@ -16,15 +20,25 @@ FIELDS = {
"ALBUM": {
"ID": {"field": "id", "truncate": 0},
"Title": {"field": "title"},
"Tracks": {"field": "tracks", "handler": lambda v: len(v), "truncate": 0},
"Tracks": {"field": "tracks_count"},
"Artist": {"field": "artist.name"},
},
"CHANNEL": {
"Category": {"field": "artist.content_category", "truncate": 0},
"Username": {"field": "actor.full_username", "truncate": 0},
"Name": {"field": "artist.name"},
"Descsription": {"field": "artist.description.text"},
"RSS URL": {"field": "rss_url"},
"Artist": {"field": "artist.id"},
"Metadata": {"field": "metadata"},
"Tags": {"field": "artist.tags", "handler": ", ".join,},
},
"TRACK": {
"ID": {"field": "id", "truncate": 0},
"Title": {"field": "title"},
"Artist": {"field": ["album.artist.name", "track.artist.name"]},
"Album": {"field": "album.title"},
"Disc": {"field": "disc", "handler": lambda v: v or 1},
"Disc": {"field": "disc_number", "handler": lambda v: v or 1},
"Position": {"field": "position"},
},
"UPLOAD": {
......@@ -37,13 +51,46 @@ FIELDS = {
},
"LIBRARY": {
"Name": {"field": "name"},
"Visibility": {"field": "privacy_level"},
"Description": {"field": "description"},
"Uploads": {"field": "uploads_count"},
},
"TRACK_FAVORITE": {
"Track ID": {"field": "track.id", "truncate": 0},
"Track": {"field": "track.title"},
"Artist": {"field": "track.artist.name"},
"Favorite Date": {"field": "creation_date"},
},
"PLAYLIST": {
"Tracks Count": {"field": "tracks_count"},
"User": {"field": "user.username"},
},
"PLAYLIST_TRACK": {
"ID": {"field": "track.id"},
"Position": {"field": "index"},
"Title": {"field": "track.title"},
"Artist": {"field": "track.artist.name"},
"Album": {"field": "track.album.title"},
},
"USER": {
"Username": {"field": "username"},
"Federation ID": {"field": "full_username"},
"Email": {"field": "email"},
"Joined": {"field": "date_joined"},
"Staff": {"field": "is_staff"},
"Admin": {"field": "is_admin"},
"Permissions": {
"field": "permissions",
"handler": lambda v: ", ".join([k for k, v in v.items() if v]),
},
},
"*": {
"Name": {"field": "name"},
"Visibility": {"field": "privacy_level"},
"Created": {"field": "creation_date"},
"Modified": {"field": "modification_date"},
"UUID": {"field": "uuid", "truncate": 0},
"ID": {"field": "id", "truncate": 0},
"Tags": {"field": "tags", "handler": ", ".join,},
},
}
......@@ -110,3 +157,27 @@ def table(objects, fields, type, headers=True, format="simple"):
]
return tabulate.tabulate(rows, headers=headers, tablefmt=format)
def obj_table(obj, fields, type, headers=True, format="simple"):
"""
same as table(), but output a two-column table for a single object,
with fields on the left and values on the right
"""
configs = {}
for f in fields:
try:
configs[f] = FIELDS[type][f]
except KeyError:
try:
configs[f] = FIELDS["*"][f]
except KeyError:
raise ValueError("{} is not a valid field for type {}".format(f, type))
rows = [
(f, get_value(obj, configs[f], truncate=configs[f].get("truncate", 30)))
for f in fields
]
return tabulate.tabulate(rows, headers=[], tablefmt=format)
USER_AGENT = "funkwhale/cli"
TIMEOUT = 5
TIMEOUT = 10
import json
import sys
import urllib.parse
import pathvalidate
if sys.version_info >= (3, 10):
from collections.abc import MutableMapping
else:
from collections import MutableMapping
def recursive_getattr(obj, key, permissive=False):
"""
Given a dictionary such as {'user': {'name': 'Bob'}} and
a dotted string such as user.name, returns 'Bob'.
If the value is not present, returns None
"""
v = obj
for k in key.split("."):
try:
v = v.get(k)
except (TypeError, AttributeError):
if not permissive:
raise
return
if v is None:
return
return v
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = urllib.parse.unquote(url)
# Extracting url info
parsed_url = urllib.parse.urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(urllib.parse.parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{
k: json.dumps(v)
for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))
}
)
# Converting URL argument to proper query string
encoded_get_args = urllib.parse.urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = urllib.parse.ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_get_args,
parsed_url.fragment,
).geturl()
return new_url
def sanitize_recursive(value):
if isinstance(value, dict):
return {k: sanitize_recursive(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_recursive(v) for v in value]
else:
if value and str(value).startswith("/"):
value = str(value)[1:]
if isinstance(value, str):
value = value.replace("/", "_")
return pathvalidate.sanitize_filepath(str(value))
def flatten(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def get_url_param(url, name):
parsed = urllib.parse.urlparse(url)
v = urllib.parse.parse_qs(parsed.query).get(name)
if v:
return v[0]
return None
[tool.towncrier]
package = "changes"
package_dir = ""
filename = "CHANGELOG"
directory = "changes/changelog.d/"
start_string = ".. towncrier\n"
template = "changes/template.rst"
issue_format = ""
title_format = "{version} (unreleased)"
underlines = "-"
[[tool.towncrier.section]]
path = ""
[[tool.towncrier.type]]
directory = "feature"
name = "Features"
showcontent = true
[[tool.towncrier.type]]
directory = "enhancement"
name = "Enhancements"
showcontent = true
[[tool.towncrier.type]]
directory = "bugfix"
name = "Bugfixes"
showcontent = true
[[tool.towncrier.type]]
directory = "doc"
name = "Documentation"
showcontent = true
[[tool.towncrier.type]]
directory = "i18n"
name = "i18n"
showcontent = true
[[tool.towncrier.type]]
directory = "misc"
name = "Other"
showcontent = true
[tool.black]
exclude = "(.git|.hg|.mypy_cache|.tox|.venv|_build|buck-out|build|dist|migrations)"
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": ["local>funkwhale/ci//renovate"],
"prConcurrentLimit": 1,
"rangeStrategy": "pin"
}
[metadata]
name = funkwhale-cli
description = "XXX"
version = 0.1.dev0
author = Eliot Berriot
author_email = contact@eliotberriot.com
description = "A command line interface to interact with Funkwhale servers"
version = 0.1.2.dev0
author = The Funkwhale Collective
author_email = contact@funkwhale.audio
url = https://dev.funkwhale.audio/funkwhale/cli
long_description = file: README.md
long_description_content_type = text/markdown
license = AGPL3
keywords = cli
classifiers =
Development Status :: 3 - Alpha
License :: OSI Approved :: AGPL
Intended Audience :: End Users/Desktop
License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
Natural Language :: English
Operating System :: MacOS :: MacOS X
Operating System :: POSIX
Operating System :: POSIX :: BSD
Operating System :: POSIX :: Linux
Operating System :: Microsoft :: Windows
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
[options]
zip_safe = True
include_package_data = True
packages = find:
python_requires = >=3.6
install_requires =
aiofiles
aiohttp
appdirs
click
click-log
keyring
marshmallow
python-dotenv
semver
tabulate
tqdm
aiofiles==0.7.0
aiohttp==3.7.4.post0
appdirs==1.4.4
click==7.1.2
click-log==0.3.2
keyring==23.0.1
marshmallow==2.19.5
python-dotenv==0.18.0
semver==2.13.0
tabulate==0.8.10
tqdm==4.61.2
pathvalidate==2.4.1
[options.entry_points]
console_scripts =
......@@ -37,14 +52,27 @@ console_scripts =
[options.extras_require]
dev =
aioresponses
asynctest
ipdb
pytest
pytest-mock
pytest-env
ipdb==0.13.13
test =
pytest==6.2.5
pytest-mock==3.6.1
pytest-env==0.6.2
aioresponses==0.7.6
asynctest==0.13.0
build =
pyinstaller==4.10
build-pypi =
setuptools==56.0.0
wheel==0.36.2
publish =
twine==3.4.2
changelog =
towncrier==21.3.0
[options.packages.find]
exclude =
......@@ -55,3 +83,8 @@ universal = 1
[tool:pytest]
testpaths = tests
[flake8]
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,tests/data,tests/music/conftest.py
ignore = F405,W503,E203
import pytest
import aiohttp
from aioresponses import aioresponses
from aioresponses import aioresponses, compat as aioresponses_compat
import asynctest
......@@ -13,6 +13,15 @@ def responses():
yield m
@pytest.fixture
def get_requests(responses):
def get_calls(method, url):
url = aioresponses_compat.normalize_url(url)
return responses.requests[(method.upper(), url)]
return get_calls
@pytest.fixture
async def session(loop):
async with aiohttp.ClientSession() as session:
......
import aiohttp
import marshmallow
import pytest
......@@ -80,6 +79,5 @@ def test_clean_nodeinfo():
def test_clean_nodeinfo_raises_on_validation_failure():
payload = {}
with pytest.raises(marshmallow.ValidationError):
api.clean_nodeinfo({})
import json
import uuid
import pytest
import click
......@@ -31,7 +34,7 @@ def cli_ctx(mocker):
def test_delete_command(group, cli_ctx, session, responses):
command = cli.get_delete_command(group, "api/v1/noop/{}/")
command = cli.base.get_delete_command(group, "api/v1/noop/{}/")
id = "fake_id"
responses.delete("https://test.funkwhale/api/v1/noop/fake_id/")
command.callback(id=[id], raw=False, no_input=True, _async_reraise=True)
......@@ -60,4 +63,659 @@ def test_delete_command(group, cli_ctx, session, responses):
],
)
def test_get_pagination_data(input, output):
assert cli.get_pagination_data(input) == output
assert cli.base.get_pagination_data(input) == output
def test_lazy_credential(mocker):
get_password = mocker.patch("keyring.get_password", return_value="password")
credential = cli.auth.lazy_credential("http://testurl", "_")
get_password.assert_not_called()
str(credential)
get_password.assert_called_once_with("http://testurl", "_")
assert credential == "password"
# result is cached
str(credential)
assert get_password.call_count == 1
def test_users_me(cli_ctx, session, responses, get_requests):
command = cli.users.users_me
url = "https://test.funkwhale/api/v1/users/users/me/"
responses.get(
url,
payload={
"id": 1,
"username": "user",
"full_username": "user@funkwhale.user.com",
"name": "",
"email": "contact@user.com",
"is_staff": True,
"is_superuser": True,
"permissions": {"library": True, "moderation": True, "settings": True},
"date_joined": "2016-04-30T13:36:07.747395Z",
"privacy_level": "instance",
"quota_status": {
"max": 100000,
"remaining": 82102.249366,
"current": 17897.750634,
"skipped": 46.135774,
"pending": 0.0,
"finished": 17851.614859999998,
"errored": 0.0,
},
},
)
command.callback(raw=False, column=None, format=None)
requests = get_requests("get", url)
assert len(requests) == 1
def test_libraries_create(cli_ctx, session, responses, get_requests):
command = cli.libraries.libraries_create
url = "https://test.funkwhale/api/v1/libraries/"
responses.post(url)
command.callback(name="test", visibility="public", raw=False, _async_reraise=True)
requests = get_requests("post", url)
assert len(requests) == 1
assert requests[0].kwargs["data"] == {"name": "test", "visibility": "public"}
def test_libraries_ls(cli_ctx, session, responses, get_requests):
command = cli.libraries.libraries_ls
url = "https://test.funkwhale/api/v1/libraries/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_libraries_rm(cli_ctx, session, responses, get_requests):
command = cli.libraries.libraries_rm
url = "https://test.funkwhale/api/v1/libraries/"
responses.delete(url + "1/")
responses.delete(url + "42/")
command.callback(id=[1, 42], raw=False, no_input=True, _async_reraise=True)
assert len(get_requests("delete", url + "1/")) == 1
assert len(get_requests("delete", url + "42/")) == 1
def test_uploads_create(cli_ctx, session, responses, get_requests, tmpdir):
tmp_file = tmpdir.join("test.mp3")
tmp_file.write_text("content", "ascii")
library_id = str(uuid.uuid4())
command = cli.uploads.uploads_create
responses.post("https://test.funkwhale/api/v1/uploads/")
responses.get("https://test.funkwhale/api/v1/libraries/{}/".format(library_id))
command.callback(
library_or_channel_id=library_id,
paths=[str(tmp_file)],
parallel=1,
channel=False,
ref="test-import",
draft=False,
album=None,
license=None,
_async_reraise=True,
)
expected_data = {
"library": library_id,
"import_reference": "test-import",
"source": "upload://test.mp3",
}
upload_requests = get_requests("post", "https://test.funkwhale/api/v1/uploads/")
assert len(upload_requests) == 1
data = upload_requests[0].kwargs["data"]
audio_file = data.pop("audio_file")
assert data == expected_data
assert audio_file.name == str(tmp_file)
libraries_requests = get_requests(
"get", "https://test.funkwhale/api/v1/libraries/{}/".format(library_id)
)
assert len(libraries_requests) == 1
def test_uploads_create_channel(cli_ctx, session, responses, get_requests, tmpdir):
tmp_file = tmpdir.join("test.mp3")
tmp_file.write_text("content", "ascii")
channel_id = str(uuid.uuid4())
upload_id = str(uuid.uuid4())
command = cli.uploads.uploads_create
upload_data = {
"uuid": upload_id,
"import_metadata": {
"title": "test.mp3",
"album": 12,
"license": "cc-by-sa-4.0",
},
}
responses.post("https://test.funkwhale/api/v1/uploads/", payload=upload_data)
responses.get("https://test.funkwhale/api/v1/channels/{}/".format(channel_id))
responses.get(
"https://test.funkwhale/api/v1/uploads/{}/audio-file-metadata/".format(
upload_id
),
payload={"title": "test title"},
)
responses.patch("https://test.funkwhale/api/v1/uploads/{}/".format(upload_id))
command.callback(
library_or_channel_id=channel_id,
channel=True,
paths=[str(tmp_file)],
parallel=1,
ref="test-import",
draft=False,
album=12,
license="cc-by-sa-4.0",
_async_reraise=True,
)
expected_data = {
"channel": channel_id,
"import_reference": "test-import",
"source": "upload://test.mp3",
"import_status": "draft",
"import_metadata": json.dumps(
{"title": "test.mp3", "album": 12, "license": "cc-by-sa-4.0"}
),
}
upload_requests = get_requests("post", "https://test.funkwhale/api/v1/uploads/")
assert len(upload_requests) == 1
data = upload_requests[0].kwargs["data"]
audio_file = data.pop("audio_file")
assert data == expected_data
assert audio_file.name == str(tmp_file)
channels_requests = get_requests(
"get", "https://test.funkwhale/api/v1/channels/{}/".format(channel_id)
)
assert len(channels_requests) == 1
publish_requests = get_requests(
"patch", "https://test.funkwhale/api/v1/uploads/{}/".format(upload_id)
)
assert len(publish_requests) == 1
data = publish_requests[0].kwargs["json"]
expected_data = {
"import_status": "pending",
"import_metadata": {
"title": "test title",
"album": 12,
"license": "cc-by-sa-4.0",
},
}
assert data == expected_data
def test_uploads_ls(cli_ctx, session, responses, get_requests):
command = cli.uploads.uploads_ls
url = "https://test.funkwhale/api/v1/uploads/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_uploads_rm(cli_ctx, session, responses, get_requests):
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
command = cli.uploads.uploads_rm
url = "https://test.funkwhale/api/v1/uploads/"
responses.delete(url + uuid1 + "/")
responses.delete(url + uuid2 + "/")
command.callback(id=[uuid1, uuid2], raw=False, no_input=True, _async_reraise=True)
assert len(get_requests("delete", url + uuid1 + "/")) == 1
assert len(get_requests("delete", url + uuid2 + "/")) == 1
def test_favorites_tracks_create(cli_ctx, session, responses, get_requests):
command = cli.favorites.favorites_tracks_create
url = "https://test.funkwhale/api/v1/favorites/tracks/"
responses.post(url, repeat=True)
command.callback(id=[1, 42], _async_reraise=True)
requests = get_requests("post", url)
assert len(requests) == 2
assert requests[0].kwargs["data"] == {"track": 1}
assert requests[1].kwargs["data"] == {"track": 42}
def test_favorites_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.favorites.favorites_tracks_ls
url = "https://test.funkwhale/api/v1/favorites/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_favorites_tracks_rm(cli_ctx, session, responses, get_requests):
command = cli.favorites.favorites_tracks_rm
url = "https://test.funkwhale/api/v1/favorites/tracks/remove/"
responses.delete(url, repeat=True)
command.callback(id=[1, 42], _async_reraise=True)
requests = get_requests("delete", url)
assert len(requests) == 2
assert requests[0].kwargs["data"] == {"track": 1}
assert requests[1].kwargs["data"] == {"track": 42}
def test_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.tracks.tracks_ls
url = "https://test.funkwhale/api/v1/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_tracks_generate_playlist(cli_ctx, session, responses, get_requests, capfd):
expected = "./Test-Artist - Test-Album - Test-Track.flac\n"
command = cli.tracks.track_generate_playlist
url = "https://test.funkwhale/api/v1/tracks/1/"
responses.get(
url,
payload={"uploads": [{"extension": "flac"}], "album": {"title": "Test-Album", "release_date": "2021-02-01"},
"artist": {"name": "Test-Artist"}, "title": "Test-Track"}
)
command.callback(
id="1",
format=None,
directory=None,
template="{artist} - {album} - {title}.{extension}",
base_path="./",
name="playlist",
)
requests = get_requests("get", url)
assert len(requests) == 1
out, err = capfd.readouterr()
assert out == expected
def test_tracks_generate_playlist_base_path(cli_ctx, session, responses, get_requests, capfd):
expected = "/different_base/Test-Artist - Test-Album - Test-Track.flac\n"
command = cli.tracks.track_generate_playlist
url = "https://test.funkwhale/api/v1/tracks/1/"
responses.get(
url,
payload={"uploads": [{"extension": "flac"}], "album": {"title": "Test-Album", "release_date": "2021-02-01"},
"artist": {"name": "Test-Artist"}, "title": "Test-Track"}
)
command.callback(
id="1",
format=None,
directory=None,
template="{artist} - {album} - {title}.{extension}",
base_path="/different_base/",
name="playlist",
)
requests = get_requests("get", url)
assert len(requests) == 1
out, err = capfd.readouterr()
assert out == expected
def test_tracks_generate_playlist_format(cli_ctx, session, responses, get_requests, capfd):
expected = "./Test-Artist - Test-Album - Test-Track.mp3\n"
command = cli.tracks.track_generate_playlist
url = "https://test.funkwhale/api/v1/tracks/1/"
responses.get(
url,
payload={"uploads": [{"extension": "flac"}], "album": {"title": "Test-Album", "release_date": "2021-02-01"},
"artist": {"name": "Test-Artist"}, "title": "Test-Track"}
)
command.callback(
id="1",
format="mp3",
directory=None,
template="{artist} - {album} - {title}.{extension}",
base_path="./",
name="playlist",
)
requests = get_requests("get", url)
assert len(requests) == 1
out, err = capfd.readouterr()
assert out == expected
def test_artists_ls(cli_ctx, session, responses, get_requests):
command = cli.artists.artists_ls
url = "https://test.funkwhale/api/v1/artists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_albums_ls(cli_ctx, session, responses, get_requests):
command = cli.albums.albums_ls
url = "https://test.funkwhale/api/v1/albums/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_playlists_create(cli_ctx, session, responses, get_requests):
command = cli.playlists.playlists_create
url = "https://test.funkwhale/api/v1/playlists/"
responses.post(url)
command.callback(name="test", visibility="public", raw=False, _async_reraise=True)
requests = get_requests("post", url)
assert len(requests) == 1
assert requests[0].kwargs["data"] == {"name": "test", "privacy_level": "public"}
def test_playlists_ls(cli_ctx, session, responses, get_requests):
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_playlists_ls_mine(
cli_ctx, session, responses, get_requests, mocker, coroutine_mock
):
get_user_info = mocker.patch.object(
cli.base, "get_user_info", coroutine_mock(return_value={"id": 42})
)
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello&user=42"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
owned=True,
)
requests = get_requests("get", url)
assert len(requests) == 1
get_user_info.assert_called_once_with(cli_ctx)
def test_playlists_rm(cli_ctx, session, responses, get_requests):
command = cli.playlists.playlists_rm
url = "https://test.funkwhale/api/v1/playlists/"
responses.delete(url + "1/")
responses.delete(url + "42/")
command.callback(id=[1, 42], raw=False, no_input=True, _async_reraise=True)
assert len(get_requests("delete", url + "1/")) == 1
assert len(get_requests("delete", url + "42/")) == 1
def test_playlists_tracks_add(cli_ctx, session, responses, get_requests):
command = cli.playlists.playlists_tracks_add
url = "https://test.funkwhale/api/v1/playlists/66/add"
responses.post(url)
command.callback(id=66, track=[1, 42], no_duplicates=True, _async_reraise=True)
requests = get_requests("post", url)
assert len(requests) == 1
assert requests[0].kwargs["data"] == {"tracks": [1, 42], "allow_duplicates": False}
def test_playlists_tracks(cli_ctx, session, responses, get_requests):
command = cli.playlists.playlists_tracks
url = "https://test.funkwhale/api/v1/playlists/66/tracks/"
responses.get(url, payload={"results": [], "count": 0})
command.callback(
id=66, raw=False, column=None, format=None, no_headers=False, ids=False
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_channel_create_music(cli_ctx, session, responses, get_requests):
command = cli.channels.channels_create
url = "https://test.funkwhale/api/v1/channels/"
responses.post(url)
command.callback(
name="test",
cover="uuid",
username="hello",
content_category="music",
description="description text",
tags="punk,rock, ska",
language=None,
itunes_category=None,
raw=False,
_async_reraise=True,
)
requests = get_requests("post", url)
assert len(requests) == 1
assert requests[0].kwargs["json"] == {
"name": "test",
"cover": "uuid",
"username": "hello",
"content_category": "music",
"description": {"text": "description text", "content_type": "text/markdown"},
"tags": ["punk", "rock", "ska"],
}
def test_channel_create_podcast(cli_ctx, session, responses, get_requests):
command = cli.channels.channels_create
url = "https://test.funkwhale/api/v1/channels/"
responses.post(url)
command.callback(
name="test",
cover="uuid",
username="hello",
content_category="podcast",
description="description text",
tags="punk,rock, ska",
language="en",
itunes_category="Leisure",
raw=False,
_async_reraise=True,
)
requests = get_requests("post", url)
assert len(requests) == 1
assert requests[0].kwargs["json"] == {
"name": "test",
"cover": "uuid",
"username": "hello",
"content_category": "podcast",
"description": {"text": "description text", "content_type": "text/markdown"},
"tags": ["punk", "rock", "ska"],
"metadata": {"itunes_category": "Leisure", "language": "en",},
}
def test_channels_ls(cli_ctx, session, responses, get_requests):
command = cli.channels.channels_ls
url = "https://test.funkwhale/api/v1/channels/?ordering=-creation_date&page=1&page_size=5&q=hello&scope=me"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="subscribed=true",
scope="me",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
)
requests = get_requests("get", url)
assert len(requests) == 1
def test_channels_rm(cli_ctx, session, responses, get_requests):
command = cli.channels.channels_rm
url = "https://test.funkwhale/api/v1/channels/"
responses.delete(url + "uuid1/")
responses.delete(url + "uuid2/")
command.callback(
id=["uuid1", "uuid2"], raw=False, no_input=True, _async_reraise=True
)
assert len(get_requests("delete", url + "uuid1/")) == 1
assert len(get_requests("delete", url + "uuid2/")) == 1