Commit ef18e5ea authored by petitminion's avatar petitminion
Browse files

Merge branch 'playlist' of https://dev.funkwhale.audio/petitminion/funkwhale into develop

parents 5fccbb90 cab28a47
Pipeline #16342 failed with stages
in 22 seconds
from xml.etree.ElementTree import Element
from django.db.models.fields import CharField, IntegerField
from rest_framework import serializers from rest_framework import serializers
from funkwhale_api.federation import serializers as federation_serializers from funkwhale_api.federation import serializers as federation_serializers
from funkwhale_api.music.models import Track from funkwhale_api.music.models import Track
from funkwhale_api.music.serializers import TrackSerializer from funkwhale_api.music.serializers import TrackSerializer
from funkwhale_api.playlists import utils
from funkwhale_api.playlists.models import Playlist
from funkwhale_api.users.serializers import UserBasicSerializer from funkwhale_api.users.serializers import UserBasicSerializer
from . import models from . import models
...@@ -115,3 +121,33 @@ class PlaylistAddManySerializer(serializers.Serializer): ...@@ -115,3 +121,33 @@ class PlaylistAddManySerializer(serializers.Serializer):
class Meta: class Meta:
fields = "allow_duplicates" fields = "allow_duplicates"
class XspfSerializer(serializers.Serializer):
title = CharField()
playlist_id = IntegerField()
class Meta:
fields = (
"title",
"playlist_id",
)
def get_title():
return "test"
def generate_xspf_from_playlist():
"""
This returns a string containing playlist data in xspf format
"""
fw_playlist = Playlist.objects.get(id=playlist_id)
plt_tracks = fw_playlist.playlist_tracks.prefetch_related("track")
xspf_playlist = Element("playlist")
xspf_tracklist = utils.write_xspf_headers(
xspf_playlist, fw_playlist.name, str(fw_playlist.creation_date)
)
for plt_track in plt_tracks:
track = plt_track.track
utils.write_xspf_track_data(track, xspf_tracklist)
return utils.prettify(xspf_playlist)
import logging
import re
from datetime import datetime
# /!\ The next import have xml vulnerabilities but this shouldn't have security implication in funkwhale
# since there are only used to generate xspf file.
from xml.etree.ElementTree import Element, SubElement
from defusedxml import ElementTree as etree
from defusedxml import minidom
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from funkwhale_api.music.models import Album, Artist, Track
logger = logging.getLogger(__name__)
def clean_namespace_xspf(xspf_file):
"""
This will delete any namaespace found in the xspf file. It will also delete any encoding info.
This way xspf file will be compatible with our get_track_id_from_xspf function.
"""
file = open(xspf_file)
with file as f:
xspf_str = f.read()
xspf_data = re.sub('xmlns="http://xspf.org/ns/0/"', "", xspf_str)
# This is needed because lxml error : "ValueError: Unicode strings with encoding declaration are
# not supported. Please use bytes input or XML fragments without declaration."
xspf_data = re.sub("'encoding='.'", "", xspf_data)
return xspf_data
def album_exist(track, artist_id):
try:
album = track.find(".//album").text
except AttributeError as e:
logger.info(
f"Couldn't find the following attribute while parsing the xml : {e!r}. No album information."
)
return
try:
album_id = Album.objects.get(title=album, artist_id=artist_id)
except Exception as e:
logger.info(f"Error while quering database for album : {e!r}")
return
except MultipleObjectsReturned as e:
album_id = Album.objects.filter(title=album, artist_id=artist_id).first
return album_id
return album_id
def get_track_id_from_xspf(xspf_file):
"""
Return a list of funkwhale tracks id from a xspf file. Tracks not found in database are ignored.
Usefull to generate playlist from xspf files.
"""
track_list = []
xspf_data_clean = clean_namespace_xspf(xspf_file)
tree = etree.fromstring(xspf_data_clean)
tracks = tree.findall(".//track")
added_track_count = 0
for track in tracks:
track_id = ""
# Getting metadata of the xspf file
try:
artist = track.find(".//creator").text
title = track.find(".//title").text
except AttributeError as e:
logger.info(
f"Couldn't find the following attribute while parsing the xml file for artist and title data : {e!r}. \
Switching to next track..."
)
continue
# Finding track id in the db
try:
artist_id = Artist.objects.get(name=artist)
except Exception as e:
logger.info(f"Error while quering database : {e!r}. Switching to next track.")
continue
except MultipleObjectsReturned as e:
artist_id = Artist.objects.filter(name=artist).first()
album_id = album_exist(track, artist_id)
if album_id:
try:
track_id = Track.objects.get(
title=title, artist=artist_id.id, album=album_id.id
)
except ObjectDoesNotExist as e :
logger.info(f"Couldn't find track in the database : {e!r}. Trying without album...")
except MultipleObjectsReturned as e:
track_id = Track.objects.filter(
title=title, artist=artist_id.id, album=album_id.id
).first()
else:
try:
track_id = Track.objects.get(title=title, artist=artist_id.id)
except ObjectDoesNotExist as e:
logger.info(f"Couldn't find track in the database : {e!r}")
continue
except MultipleObjectsReturned as e:
track_id = Track.objects.filter(title=title, artist=artist_id.id).first()
if track_id:
track_list.append(track_id.id)
added_track_count = added_track_count + 1
logger.info(
str(len(tracks))
+ " tracks where found in xspf file. "
+ str(added_track_count)
+ " are gonna be added to playlist."
)
return track_list
def generate_xspf_from_tracks_ids(tracks_ids):
"""
This returns a string containing playlist data in xspf format. It's used for test purposes.
"""
xspf_title = "An automated generated playlist"
now = datetime.now()
xspf_date = now.strftime("%m/%d/%Y")
xspf_playlist = Element("playlist")
xspf_tracklist = write_xspf_headers(xspf_playlist, xspf_title, xspf_date)
for track_id in tracks_ids:
try:
track = Track.objects.get(id=track_id)
write_xspf_track_data(track, xspf_tracklist)
except ObjectDoesNotExist as e:
logger.info(f"Error while quering database : {e!r}")
return prettify(xspf_playlist)
def write_xspf_headers(xspf_playlist, xspf_title, xspf_date):
"""
This generate the playlist metadata and return a trackList subelement used to insert each track
into the playlist
"""
xspf_playlist.set("version", "1")
title_xspf = SubElement(xspf_playlist, "title")
title_xspf.text = xspf_title
date_xspf = SubElement(xspf_playlist, "date")
date_xspf.text = xspf_date
trackList_xspf = SubElement(xspf_playlist, "trackList")
return trackList_xspf
def write_xspf_track_data(track, trackList_xspf):
"""
Insert a track into the trackList subelement of a xspf file
"""
track_xspf = SubElement(trackList_xspf, "track")
location_xspf = SubElement(track_xspf, "location")
location_xspf.text = "https://" + track.domain_name + track.listen_url
title_xspf = SubElement(track_xspf, "title")
title_xspf.text = str(track.title)
creator_xspf = SubElement(track_xspf, "creator")
creator_xspf.text = str(track.artist)
if str(track.album) == "[non-album tracks]":
return
else:
album_xspf = SubElement(track_xspf, "album")
album_xspf.text = str(track.album)
def prettify(elem):
"""
Return a pretty-printed XML string for the Element.
"""
rough_string = etree.tostring(elem, "utf-8")
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent=" ")
from defusedxml import ElementTree as etree
from funkwhale_api.federation import serializers as federation_serializers from funkwhale_api.federation import serializers as federation_serializers
from funkwhale_api.playlists import serializers from funkwhale_api.playlists import models, serializers
from funkwhale_api.users import serializers as users_serializers from funkwhale_api.users import serializers as users_serializers
...@@ -60,3 +62,14 @@ def test_playlist_serializer(factories, to_api_date): ...@@ -60,3 +62,14 @@ def test_playlist_serializer(factories, to_api_date):
serializer = serializers.PlaylistSerializer(playlist) serializer = serializers.PlaylistSerializer(playlist)
assert serializer.data == expected assert serializer.data == expected
def test_generate_xspf_from_playlist(factories):
playlist = factories["playlists.PlaylistTrack"]()
playlist_factory = models.Playlist.objects.get()
xspf_test = serializers.PlaylistSerializer.generate_xspf_from_playlist(playlist.id)
tree = etree.fromstring(xspf_test)
track1 = playlist_factory.playlist_tracks.get(id=1)
track1_name = track1.track
assert playlist_factory.name == tree.findtext("./title")
assert track1_name.title == tree.findtext("./trackList/track/title")
import os
from funkwhale_api.playlists import utils
def test_get_track_id_from_xspf(factories, tmp_path):
track1 = factories["music.Track"]()
track2 = factories["music.Track"]()
tracks_ids = [track1.id, track2.id]
xspf_content = utils.generate_xspf_from_tracks_ids(tracks_ids)
f = open("test.xspf", "w")
f.write(xspf_content)
f.close()
xspf_file = "test.xspf"
expected = [track1.id, track2.id]
assert utils.get_track_id_from_xspf(xspf_file) == expected
os.remove("test.xspf")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment