Skip to content
Snippets Groups Projects

Added generate-playlist command to tracks

Merged EorlBruder requested to merge EorlBruder/cli:fix-16 into master
Files
4
+ 78
17
@@ -9,6 +9,10 @@ from . import base
from .. import logs
from .. import utils
TEMPLATE_ENV_VAR = "FUNKWHALE_DOWNLOAD_PATH_TEMPLATE"
TEMPLATE_DEFAULT = "{artist} - {album} - {title}.{extension}"
@base.cli.group()
@click.pass_context
@@ -52,6 +56,21 @@ async def get_track_download_url(id, remote, format=None):
return download_url, format, payload
def extract_filename_params(track_data, format):
filename_params = utils.flatten(track_data)
filename_params["album"] = filename_params["album_title"]
filename_params["artist"] = filename_params["artist_name"]
filename_params["extension"] = format
filename_params["year"] = (
filename_params["album_release_date"][:4]
if filename_params["album_release_date"]
else None
)
return {
k: utils.sanitize_recursive(v) for k, v in filename_params.items()
}
@tracks.command("download")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@@ -62,19 +81,19 @@ async def get_track_download_url(id, remote, format=None):
@click.option(
"-t",
"--template",
default="{artist} - {album} - {title}.{extension}",
envvar="FUNKWHALE_DOWNLOAD_PATH_TEMPLATE",
default=TEMPLATE_DEFAULT,
envvar=TEMPLATE_ENV_VAR,
)
@click.pass_context
@base.async_command
async def track_download(
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
ctx, id, format, directory, template, overwrite, ignore_errors, skip_existing
):
async with ctx.obj["remote"]:
progressbar = tqdm.tqdm(id, unit="Files")
for i in progressbar:
try:
download_url, format, track_data = await get_track_download_url(
download_url, extension, track_data = await get_track_download_url(
i, ctx.obj["remote"], format=format
)
except click.ClickException as e:
@@ -82,18 +101,8 @@ async def track_download(
continue
logs.logger.info("Downloading from {}".format(download_url))
filename_params = utils.flatten(track_data)
filename_params["album"] = filename_params["album_title"]
filename_params["artist"] = filename_params["artist_name"]
filename_params["extension"] = format
filename_params["year"] = (
filename_params["album_release_date"][:4]
if filename_params["album_release_date"]
else None
)
filename_params = {
k: utils.sanitize_recursive(v) for k, v in filename_params.items()
}
filename_params = extract_filename_params(track_data, extension)
if directory:
filename = template.format(**filename_params)
full_path = os.path.join(directory, filename)
@@ -113,7 +122,7 @@ async def track_download(
)
async with ctx.obj["remote"].request(
"get", download_url, timeout=0
"get", download_url, timeout=0
) as response:
try:
response.raise_for_status()
@@ -144,3 +153,55 @@ async def track_download(
break
out.write(chunk)
logs.logger.info("Download complete")
async def get_track_data(id, remote, format=None):
result = await remote.request("get", "api/v1/tracks/{}/".format(id))
result.raise_for_status()
payload = await result.json()
if not format and payload["uploads"]:
format = payload["uploads"][0]["extension"]
return format, payload
@tracks.command("generate-playlist")
@click.argument("id", nargs=-1, required=True)
@click.option("--format", "-f")
@click.option("-d", "--directory", type=click.Path(exists=True))
@click.option(
"-t",
"--template",
default=TEMPLATE_DEFAULT,
envvar=TEMPLATE_ENV_VAR,
)
@click.option("-b", "--base-path", default="./")
@click.option("-n", "--name", default="playlist")
@click.pass_context
@base.async_command
async def track_generate_playlist(
ctx, id, format, directory, template, base_path, name
):
async with ctx.obj["remote"]:
playlist = []
for i in id:
extension, track_data = await get_track_data(
i, ctx.obj["remote"], format=format
)
filename_params = extract_filename_params(track_data, extension)
filename = base_path + template.format(**filename_params)
playlist.append(filename)
if directory:
filename = name + ".m3u8"
full_path = os.path.join(directory, filename)
logs.logger.info("Writing playlist to {}".format(full_path))
with open(full_path, "w") as out:
for track in playlist:
out.write(track + "\n")
else:
out = click.get_binary_stream("stdout")
for track in playlist:
out.write(bytes(track + "\n", "utf8"))
logs.logger.info("Export of playlist complete")
Loading