...
 
Commits (3)
import aiohttp
from . import exceptions
from . import logs
from . import schemas
from . import settings
......
from . import auth
from . import albums
from . import artists
from . import favorites
from . import libraries
from . import playlists
from . import tracks
from . import uploads
from . import users
from .base import cli
__all__ = [
"auth",
"albums",
"artists",
"favorites",
"libraries",
"playlists",
"tracks",
"uploads",
"users",
"cli",
]
import click
from . import base
@base.cli.group()
@click.pass_context
def albums(ctx):
pass
albums_ls = base.get_ls_command(
albums,
"api/v1/albums/",
output_conf={
"labels": ["ID", "Title", "Artist", "Tracks", "Created"],
"type": "ALBUM",
"id_field": "ID",
},
)
import click
from . import base
@base.cli.group()
@click.pass_context
def artists(ctx):
pass
artists_ls = base.get_ls_command(
artists,
"api/v1/artists/",
output_conf={
"labels": ["ID", "Name", "Albums", "Tracks", "Created"],
"type": "ARTIST",
"id_field": "ID",
},
)
import click
import keyring
# importing the backends explicitely is required for PyInstaller to work
import keyring.backends.kwallet
import keyring.backends.Windows
import keyring.backends.OS_X
import keyring.backends.SecretService
import keyring.backends.chainer
from . import base
from .. import api
class lazy_credential:
"""
A proxy object to request access to the proxy object at the later possible point,
cf #4
"""
def __init__(self, *args):
self.args = args
self._cached_value = None
@property
def value(self):
if self._cached_value:
return self._cached_value
try:
v = keyring.get_password(*self.args)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
self._cached_value = v
return v
def __str__(self):
return str(self.value)
def __eq__(self, other):
return self.value == other
def __repr__(self):
return str(self.value)
def __bool__(self):
return bool(self.value)
def init_keyring():
# small hack to fix some weird issues with pyinstaller and keyring
# there seems to be a cache issue somewhere
del keyring.backend.get_all_keyring.__wrapped__.always_returns
keyring.core.init_backend()
# /end of hack
@base.cli.command()
@click.option("-u", "--username", envvar="FUNKWHALE_USERNAME", prompt=True)
@click.option(
"-p", "--password", envvar="FUNKWHALE_PASSWORD", prompt=True, hide_input=True
)
@click.pass_context
@base.async_command
async def login(ctx, username, password):
async with api.get_session() as session:
token = await api.get_jwt_token(
session, ctx.obj["SERVER_URL"], username=username, password=password
)
try:
keyring.set_password(ctx.obj["SERVER_URL"], "_", token)
except ValueError as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}. Your password may be incorrect.".format(
e.args[0]
)
)
except Exception as e:
raise click.ClickException(
"Error while retrieving password from keyring: {}".format(e.args[0])
)
click.echo("Login successfull!")
@base.cli.command()
@click.pass_context
@base.async_command
async def logout(ctx):
keyring.delete_password(ctx.obj["SERVER_URL"], "_")
click.echo("Logout successfull!")
import click
from . import base
@base.cli.group()
@click.pass_context
def favorites(ctx):
pass
@favorites.group("tracks")
@click.pass_context
def favorites_tracks(ctx):
pass
@favorites_tracks.command("create")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_create(ctx, id):
click.echo("Adding {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"post", "api/v1/favorites/tracks/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} added to favorites".format(i))
@favorites_tracks.command("rm")
@click.argument("id", nargs=-1, required=True)
@click.pass_context
@base.async_command
async def favorites_tracks_rm(ctx, id):
click.echo("Removing {} tracks to favorites…".format(len(id)))
async with ctx.obj["remote"]:
for i in id:
data = {"track": i}
async with ctx.obj["remote"].request(
"delete", "api/v1/favorites/tracks/remove/", data=data
) as response:
response.raise_for_status()
click.echo("Track {} removed from favorites".format(i))
favorites_tracks_ls = base.get_ls_command( # noqa
favorites_tracks,
"api/v1/favorites/tracks/",
output_conf={
"labels": ["Track ID", "Track", "Artist", "Favorite Date"],
"type": "TRACK_FAVORITE",
},
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def libraries(ctx):
"""
Manage libraries
"""
libraries_ls = base.get_ls_command(
libraries,
"api/v1/libraries/",
output_conf={
"labels": ["UUID", "Name", "Visibility", "Uploads"],
"type": "LIBRARY",
},
)
libraries_rm = base.get_delete_command(libraries, "api/v1/libraries/{}/")
@libraries.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def libraries_create(ctx, raw, name, visibility):
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/libraries/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Library created:")
click.echo(
output.table([payload], ["UUID", "Name", "Visibility"], type="LIBRARY")
)
import click
import json
from . import base
from .. import output
@base.cli.group()
@click.pass_context
def playlists(ctx):
"""
Manage playlists
"""
playlists_ls = base.get_ls_command(
playlists,
"api/v1/playlists/",
doc="List available playlists",
owned_conf={"param": "user", "field": "id"},
output_conf={
"labels": [
"ID",
"Name",
"Visibility",
"Tracks Count",
"User",
"Created",
"Modified",
],
"type": "PLAYLIST",
},
)
playlists_rm = base.get_delete_command(
playlists,
"api/v1/playlists/{}/",
id_metavar="PLAYLIST_ID",
doc="Delete the given playlists",
)
@playlists.command("create")
@click.option("--name", prompt=True)
@click.option(
"--visibility",
type=click.Choice(["me", "instance", "everyone"]),
default="me",
prompt=True,
)
@click.option("--raw", is_flag=True)
@click.pass_context
@base.async_command
async def playlists_create(ctx, raw, name, visibility):
"""
Create a new playlist
"""
async with ctx.obj["remote"]:
result = await ctx.obj["remote"].request(
"post", "api/v1/playlists/", data={"name": name, "visibility": visibility}
)
result.raise_for_status()
payload = await result.json()
if raw:
click.echo(json.dumps(payload, sort_keys=True, indent=4))
else:
click.echo("Playlist created:")
click.echo(
output.table(
[payload], ["ID", "Name", "Visibility", "Tracks Count"], type="PLAYLIST"
)
)
@playlists.command("tracks-add")
@click.argument("id", metavar="PLAYLIST_ID")
@click.argument("track", nargs=-1, metavar="[TRACK_ID]…")
@click.option(
"--no-duplicates",
is_flag=True,
default=False,
help="Prevent insertion of tracks that already exist in the playlist. An error will be raised in this case.",
)
@click.pass_context
@base.async_command
async def playlists_tracks_add(ctx, id, track, no_duplicates):
"""
Insert one or more tracks in a playlist
"""
if not track:
return click.echo("No track id provided")
async with ctx.obj["remote"]:
async with ctx.obj["remote"].request(
"post",
"api/v1/playlists/{}/".format(id),
data={"tracks": track, "allow_duplicates": not no_duplicates},
) as response:
response.raise_for_status()
playlists_tracks = base.get_ls_command(
playlists,
"api/v1/playlists/{}/tracks/",
name="tracks",
with_id=True,
pagination=False,
ordering=False,
filter=False,
output_conf={
"labels": ["Position", "ID", "Title", "Artist", "Album", "Created"],
"id_field": "ID",
"type": "PLAYLIST_TRACK",
},
id_metavar="PLAYLIST_ID",
doc="List the tracks included in a playlist",
)
import json
import click
from . import base
from .. import api
@base.cli.group()
@click.pass_context
def server(ctx):
pass
@server.command()
@base.RAW_DECORATOR
@click.pass_context
@base.async_command
async def info(ctx, raw):
async with api.get_session() as session:
nodeinfo = await api.fetch_nodeinfo(
session,
domain=ctx.obj["SERVER_NETLOC"],
protocol=ctx.obj["SERVER_PROTOCOL"],
)
if raw:
click.echo(json.dumps(nodeinfo, sort_keys=True, indent=4))
return
click.echo("\n")
click.echo("General")
click.echo("-------")
click.echo("Url: {}".format(ctx.obj["SERVER_URL"]))
click.echo("Name: {}".format(nodeinfo["metadata"]["nodeName"]))
click.echo(
"Short description: {}".format(nodeinfo["metadata"]["shortDescription"])
)
click.echo("\n")
click.echo("Software")
click.echo("----------")
click.echo("Software name: {}".format(nodeinfo["software"]["name"]))
click.echo("Version: {}".format(nodeinfo["software"]["version"]))
click.echo("\n")
click.echo("Configuration")
click.echo("---------------")
click.echo(
"Registrations: {}".format(
"open" if nodeinfo["openRegistrations"] else "closed"
)
)
import os
import pathlib
import aiohttp
import click
import tqdm
from . import base
from .. import logs
from .. import utils
@base.cli.group()
@click.pass_context
def tracks(ctx):
pass
tracks_ls = base.get_ls_command(
tracks,
"api/v1/tracks/",
output_conf={
"labels": ["ID", "Title", "Artist", "Album", "Disc", "Position"],
"type": "TRACK",
"id_field": "ID",
},
)
async def get_track_download_url(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
try:
download_url = payload["uploads"][0]["listen_url"]
except IndexError:
if remote.token:
raise click.ClickException("This file is not available for download")
else:
raise click.ClickException(
"This file is not available for download, try to login first"
)
if download_url.startswith("/"):
download_url = remote.base_url[:-1] + download_url
if format:
download_url = utils.add_url_params(download_url, {"to": format})
else:
format = payload["uploads"][0]["extension"]
return download_url, format, payload
@tracks.command("download")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option("-o", "--overwrite", is_flag=True, default=False)
@click.option("-s", "--skip-existing", is_flag=True, default=False)
@click.option("-i", "--ignore-errors", multiple=True, type=int)
@click.option(
"-t",
"--template",
default="{artist} - {album} - {title}.{extension}",
envvar="FUNKWHALE_DOWNLOAD_PATH_TEMPLATE",
)
@click.pass_context
@base.async_command
async def track_download(
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
):
async with ctx.obj["remote"]:
progressbar = tqdm.tqdm(id, unit="Files")
for i in progressbar:
download_url, format, track_data = await get_track_download_url(
i, ctx.obj["remote"], format=format
)
logs.logger.info("Downloading from {}".format(download_url))
filename_params = utils.flatten(track_data)
filename_params["album"] = filename_params["album_title"]
filename_params["artist"] = filename_params["artist_name"]
filename_params["extension"] = format
filename_params["year"] = (
filename_params["album_release_date"][:4]
if filename_params["album_release_date"]
else None
)
filename_params = {
k: utils.sanitize_recursive(v) for k, v in filename_params.items()
}
if directory:
filename = template.format(**filename_params)
full_path = os.path.join(directory, filename)
existing = os.path.exists(full_path)
if skip_existing and existing:
logs.logger.info(
"'{}' already exists on disk, skipping download".format(
full_path
)
)
continue
elif not overwrite and existing:
raise click.ClickException(
"'{}' already exists on disk. Relaunch this command with --overwrite if you want to replace it".format(
full_path
)
)
async with ctx.obj["remote"].request(
"get", download_url, timeout=0
) as response:
try:
response.raise_for_status()
except aiohttp.ClientResponseError as e:
if response.status in ignore_errors:
logs.logger.warning(
"Remote answered with {} for url {}, skipping".format(
response.status, download_url
)
)
continue
else:
raise click.ClickException(
"Remote answered with {} for url {}, exiting".format(
response.status, download_url
)
)
if directory:
final_directory = os.path.dirname(full_path)
pathlib.Path(final_directory).mkdir(parents=True, exist_ok=True)
logs.logger.info("Downloading to {}".format(full_path))
out = open(full_path, "wb")
else:
out = click.get_binary_stream("stdout")
while True:
chunk = await response.content.read(1024)
if not chunk:
break
out.write(chunk)
logs.logger.info("Download complete")
import datetime
import os
import asyncio
import click
import tqdm
from . import base
from .. import logs
@base.cli.group()
@click.pass_context
def uploads(ctx):
pass
uploads_ls = base.get_ls_command( # noqa
uploads,
"api/v1/uploads/",
output_conf={
"labels": ["UUID", "Track", "Artist", "Import status", "Size", "Mimetype"],
"type": "UPLOAD",
},
)
def track_read(file_obj, name, progress):
read = file_obj.read
def patched_read(size):
content = read(size)
progress.update(len(content))
progress.set_postfix(file=name[-30:], refresh=False)
return content
setattr(file_obj, "read", patched_read)
async def upload(path, size, remote, ref, library_id, semaphore, global_progress):
async with semaphore:
filename = os.path.basename(path)
data = {
"library": library_id,
"import_reference": ref,
"source": "upload://{}".format(filename),
"audio_file": open(path, "rb"),
}
track_read(data["audio_file"], filename, global_progress)
response = await remote.request("post", "api/v1/uploads/", data=data, timeout=0)
response.raise_for_status()
return response
@uploads.command("create")
@click.argument("library_id")
@click.argument("paths", nargs=-1)
@click.option("-r", "--ref", default=None)
@click.option("-p", "--parallel", type=click.INT, default=1)
@click.pass_context
@base.async_command
async def uploads_create(ctx, library_id, paths, ref, parallel):
logs.logger.info("Uploading {} files…".format(len(paths)))
paths = sorted(set(paths))
if not paths:
return
ref = ref or "funkwhale-cli-import-{}".format(datetime.datetime.now().isoformat())
sizes = {path: os.path.getsize(path) for path in paths}
async with ctx.obj["remote"]:
logs.logger.info("Checking library {} existence…".format(library_id))
library_data = await ctx.obj["remote"].request(
"get", "api/v1/libraries/{}/".format(library_id)
)
library_data.raise_for_status()
sem = asyncio.Semaphore(parallel)
progressbar = tqdm.tqdm(
total=sum(sizes.values()), unit="B", unit_scale=True, unit_divisor=1024
)
tasks = [
upload(
path=path,
ref=ref,
size=sizes[path],
global_progress=progressbar,
remote=ctx.obj["remote"],
library_id=library_id,
semaphore=sem,
)
for path in paths
]
await asyncio.gather(*tasks)
logs.logger.info("Upload complete")
import click
from . import base
@base.cli.group()
@click.pass_context
def users(ctx):
pass
users_me = base.get_show_command(
users,
"api/v1/users/users/{}/",
output_conf={
"labels": [
"ID",
"Username",
"Name",
"Email",
"Federation ID",
"Joined",
"Visibility",
"Staff",
"Admin",
"Permissions",
],
"type": "USER",
},
force_id="me",
name="me",
)
import collections
import json
import urllib.parse
import pathvalidate
def recursive_getattr(obj, key, permissive=False):
"""
Given a dictionary such as {'user': {'name': 'Bob'}} and
a dotted string such as user.name, returns 'Bob'.
If the value is not present, returns None
"""
v = obj
for k in key.split("."):
try:
v = v.get(k)
except (TypeError, AttributeError):
if not permissive:
raise
return
if v is None:
return
return v
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = urllib.parse.unquote(url)
# Extracting url info
parsed_url = urllib.parse.urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(urllib.parse.parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{
k: json.dumps(v)
for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))
}
)
# Converting URL argument to proper query string
encoded_get_args = urllib.parse.urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = urllib.parse.ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_get_args,
parsed_url.fragment,
).geturl()
return new_url
def sanitize_recursive(value):
if isinstance(value, dict):
return {k: sanitize_recursive(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_recursive(v) for v in value]
else:
return pathvalidate.sanitize_filepath(str(value))
def flatten(d, parent_key="", sep="_"):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def get_url_param(url, name):
parsed = urllib.parse.urlparse(url)
v = urllib.parse.parse_qs(parsed.query).get(name)
if v:
return v[0]
return None
......@@ -56,3 +56,8 @@ universal = 1
[tool:pytest]
testpaths = tests
[flake8]
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,tests/data,tests/music/conftest.py
ignore = F405,W503,E203
import aiohttp
import marshmallow
import pytest
......@@ -80,6 +79,5 @@ def test_clean_nodeinfo():
def test_clean_nodeinfo_raises_on_validation_failure():
payload = {}
with pytest.raises(marshmallow.ValidationError):
api.clean_nodeinfo({})
import pytest
import click
import keyring
from funkwhale_cli import api
from funkwhale_cli import cli
......@@ -32,7 +31,7 @@ def cli_ctx(mocker):
def test_delete_command(group, cli_ctx, session, responses):
command = cli.get_delete_command(group, "api/v1/noop/{}/")
command = cli.base.get_delete_command(group, "api/v1/noop/{}/")
id = "fake_id"
responses.delete("https://test.funkwhale/api/v1/noop/fake_id/")
command.callback(id=[id], raw=False, no_input=True, _async_reraise=True)
......@@ -61,12 +60,12 @@ def test_delete_command(group, cli_ctx, session, responses):
],
)
def test_get_pagination_data(input, output):
assert cli.get_pagination_data(input) == output
assert cli.base.get_pagination_data(input) == output
def test_lazy_credential(mocker):
get_password = mocker.patch("keyring.get_password", return_value="password")
credential = cli.lazy_credential("http://testurl", "_")
credential = cli.auth.lazy_credential("http://testurl", "_")
get_password.assert_not_called()
......@@ -82,7 +81,7 @@ def test_lazy_credential(mocker):
def test_users_me(cli_ctx, session, responses, get_requests):
command = cli.users_me
command = cli.users.users_me
url = "https://test.funkwhale/api/v1/users/users/me/"
responses.get(
url,
......@@ -116,7 +115,7 @@ def test_users_me(cli_ctx, session, responses, get_requests):
def test_libraries_create(cli_ctx, session, responses, get_requests):
command = cli.libraries_create
command = cli.libraries.libraries_create
url = "https://test.funkwhale/api/v1/libraries/"
responses.post(url)
......@@ -128,7 +127,7 @@ def test_libraries_create(cli_ctx, session, responses, get_requests):
def test_libraries_ls(cli_ctx, session, responses, get_requests):
command = cli.libraries_ls
command = cli.libraries.libraries_ls
url = "https://test.funkwhale/api/v1/libraries/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -153,7 +152,7 @@ def test_libraries_ls(cli_ctx, session, responses, get_requests):
def test_libraries_rm(cli_ctx, session, responses, get_requests):
command = cli.libraries_rm
command = cli.libraries.libraries_rm
url = "https://test.funkwhale/api/v1/libraries/"
responses.delete(url + "1/")
responses.delete(url + "42/")
......@@ -165,7 +164,7 @@ def test_libraries_rm(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_create(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_create
command = cli.favorites.favorites_tracks_create
url = "https://test.funkwhale/api/v1/favorites/tracks/"
responses.post(url, repeat=True)
......@@ -178,7 +177,7 @@ def test_favorites_tracks_create(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_ls
command = cli.favorites.favorites_tracks_ls
url = "https://test.funkwhale/api/v1/favorites/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -203,7 +202,7 @@ def test_favorites_tracks_ls(cli_ctx, session, responses, get_requests):
def test_favorites_tracks_rm(cli_ctx, session, responses, get_requests):
command = cli.favorites_tracks_rm
command = cli.favorites.favorites_tracks_rm
url = "https://test.funkwhale/api/v1/favorites/tracks/remove/"
responses.delete(url, repeat=True)
......@@ -216,7 +215,7 @@ def test_favorites_tracks_rm(cli_ctx, session, responses, get_requests):
def test_tracks_ls(cli_ctx, session, responses, get_requests):
command = cli.tracks_ls
command = cli.tracks.tracks_ls
url = "https://test.funkwhale/api/v1/tracks/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -241,7 +240,7 @@ def test_tracks_ls(cli_ctx, session, responses, get_requests):
def test_artists_ls(cli_ctx, session, responses, get_requests):
command = cli.artists_ls
command = cli.artists.artists_ls
url = "https://test.funkwhale/api/v1/artists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -266,7 +265,7 @@ def test_artists_ls(cli_ctx, session, responses, get_requests):
def test_albums_ls(cli_ctx, session, responses, get_requests):
command = cli.albums_ls
command = cli.albums.albums_ls
url = "https://test.funkwhale/api/v1/albums/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -291,7 +290,7 @@ def test_albums_ls(cli_ctx, session, responses, get_requests):
def test_playlists_create(cli_ctx, session, responses, get_requests):
command = cli.playlists_create
command = cli.playlists.playlists_create
url = "https://test.funkwhale/api/v1/playlists/"
responses.post(url)
......@@ -303,7 +302,7 @@ def test_playlists_create(cli_ctx, session, responses, get_requests):
def test_playlists_ls(cli_ctx, session, responses, get_requests):
command = cli.playlists_ls
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
......@@ -327,8 +326,40 @@ def test_playlists_ls(cli_ctx, session, responses, get_requests):
assert len(requests) == 1
def test_playlists_ls_mine(
cli_ctx, session, responses, get_requests, mocker, coroutine_mock
):
get_user_info = mocker.patch.object(
cli.base, "get_user_info", coroutine_mock(return_value={"id": 42})
)
command = cli.playlists.playlists_ls
url = "https://test.funkwhale/api/v1/playlists/?ordering=-creation_date&page=1&page_size=5&q=hello&user=42"
responses.get(
url, payload={"results": [], "next": None, "previous": None, "count": 0}
)
command.callback(
raw=False,
page=1,
page_size=5,
ordering="-creation_date",
filter="favorites=true",
query=["hello"],
column=None,
format=None,
no_headers=False,
ids=False,
limit=1,
owned=True,
)
requests = get_requests("get", url)
assert len(requests) == 1
get_user_info.assert_called_once_with(cli_ctx)
def test_playlists_rm(cli_ctx, session, responses, get_requests):
command = cli.playlists_rm
command = cli.playlists.playlists_rm
url = "https://test.funkwhale/api/v1/playlists/"
responses.delete(url + "1/")
responses.delete(url + "42/")
......@@ -340,7 +371,7 @@ def test_playlists_rm(cli_ctx, session, responses, get_requests):
def test_playlists_tracks_add(cli_ctx, session, responses, get_requests):
command = cli.playlists_tracks_add
command = cli.playlists.playlists_tracks_add
url = "https://test.funkwhale/api/v1/playlists/66/"
responses.post(url)
......@@ -352,7 +383,7 @@ def test_playlists_tracks_add(cli_ctx, session, responses, get_requests):
def test_playlists_tracks(cli_ctx, session, responses, get_requests):
command = cli.playlists_tracks
command = cli.playlists.playlists_tracks
url = "https://test.funkwhale/api/v1/playlists/66/tracks/"
responses.get(url, payload={"results": [], "count": 0})
......