Skip to content
Snippets Groups Projects
utils.py 6.26 KiB
Newer Older
petitminion's avatar
petitminion committed
from datetime import datetime
petitminion's avatar
petitminion committed

# /!\ The next import have xml vulnerabilities but this shouldn't have security implication in funkwhale
# since there are only used to generate xspf file.
from xml.etree.ElementTree import Element, SubElement

from defusedxml import ElementTree as etree
from defusedxml import minidom
petitminion's avatar
petitminion committed
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned

from funkwhale_api.music.models import Album, Artist, Track

logger = logging.getLogger(__name__)


def clean_namespace_xspf(xspf_file):
    """
        This will delete any namaespace found in the xspf file. It will also delete any encoding info.
petitminion's avatar
petitminion committed
        This way xspf file will be compatible with our get_track_id_from_xspf function.
    """
    file = open(xspf_file)
    with file as f:
        xspf_str = f.read()
    xspf_data = re.sub('xmlns="http://xspf.org/ns/0/"', "", xspf_str)
    # This is needed because lxml error : "ValueError: Unicode strings with encoding declaration are
    # not supported. Please use bytes input or XML fragments without declaration."
    xspf_data = re.sub("'encoding='.'", "", xspf_data)
petitminion's avatar
petitminion committed
    return xspf_data
petitminion's avatar
petitminion committed
def album_exist(track, artist_id):
    try:
        album = track.find(".//album").text
    except AttributeError as e:
        logger.info(
            f"Couldn't find the following attribute while parsing the xml : {e!r}. No album information."
        )
        return

    try:
        album_id = Album.objects.get(title=album, artist_id=artist_id)
    except Exception as e:
        logger.info(f"Error while quering database for album : {e!r}")
        return
    except MultipleObjectsReturned as e:
        album_id = Album.objects.filter(title=album, artist_id=artist_id).first
        return album_id
    return album_id


def get_track_id_from_xspf(xspf_file):
    """
        Return a list of funkwhale tracks id from a xspf file. Tracks not found in database are ignored.
petitminion's avatar
petitminion committed
        Usefull to generate playlist from xspf files.
petitminion's avatar
petitminion committed
    xspf_data_clean = clean_namespace_xspf(xspf_file)
    tree = etree.fromstring(xspf_data_clean)
    tracks = tree.findall(".//track")
petitminion's avatar
petitminion committed
    added_track_count = 0

    for track in tracks:
        track_id = ""
        # Getting metadata of the xspf file
        try:
            artist = track.find(".//creator").text
            title = track.find(".//title").text
petitminion's avatar
petitminion committed
        except AttributeError as e:
petitminion's avatar
petitminion committed
            logger.info(
petitminion's avatar
petitminion committed
                f"Couldn't find the following attribute while parsing the xml file for artist and title data : {e!r}. \
                Switching to next track..."
petitminion's avatar
petitminion committed
            )
petitminion's avatar
petitminion committed
            continue
petitminion's avatar
petitminion committed
 
        # Finding track id in the db
        try:
            artist_id = Artist.objects.get(name=artist)
        except Exception as e:
petitminion's avatar
petitminion committed
            logger.info(f"Error while quering database : {e!r}. Switching to next track.")
            continue
        except MultipleObjectsReturned as e:
            artist_id = Artist.objects.filter(name=artist).first()

        album_id = album_exist(track, artist_id)
        if album_id:
            try:
                track_id = Track.objects.get(
                    title=title, artist=artist_id.id, album=album_id.id
                )
            except ObjectDoesNotExist as e :
                logger.info(f"Couldn't find track in the database : {e!r}. Trying without album...")
            except MultipleObjectsReturned as e:
                track_id = Track.objects.filter(
                    title=title, artist=artist_id.id, album=album_id.id
                ).first()

        else:
petitminion's avatar
petitminion committed
            try:
                track_id = Track.objects.get(title=title, artist=artist_id.id)
            except ObjectDoesNotExist as e:
                logger.info(f"Couldn't find track in the database : {e!r}")
petitminion's avatar
petitminion committed
                continue
            except MultipleObjectsReturned as e:
                track_id = Track.objects.filter(title=title, artist=artist_id.id).first()

        if track_id:
            track_list.append(track_id.id)
petitminion's avatar
petitminion committed
            added_track_count = added_track_count + 1
petitminion's avatar
petitminion committed
        str(len(tracks))
        + " tracks where found in xspf file. "
petitminion's avatar
petitminion committed
        + str(added_track_count)
        + " are gonna be added to playlist."
petitminion's avatar
petitminion committed
def generate_xspf_from_tracks_ids(tracks_ids):
    """
        This returns a string containing playlist data in xspf format. It's used for test purposes.
    """
petitminion's avatar
petitminion committed
    xspf_title = "An automated generated playlist"
    now = datetime.now()
petitminion's avatar
petitminion committed
    xspf_date = now.strftime("%m/%d/%Y")
    xspf_playlist = Element("playlist")
    xspf_tracklist = write_xspf_headers(xspf_playlist, xspf_title, xspf_date)
petitminion's avatar
petitminion committed

    for track_id in tracks_ids:
petitminion's avatar
petitminion committed
        try:
            track = Track.objects.get(id=track_id)
petitminion's avatar
petitminion committed
            write_xspf_track_data(track, xspf_tracklist)
petitminion's avatar
petitminion committed
        except ObjectDoesNotExist as e:
            logger.info(f"Error while quering database : {e!r}")
petitminion's avatar
petitminion committed
    return prettify(xspf_playlist)
petitminion's avatar
petitminion committed
def write_xspf_headers(xspf_playlist, xspf_title, xspf_date):
petitminion's avatar
petitminion committed
    """
        This generate the playlist metadata and return a trackList subelement used to insert each track
        into the playlist
    """
petitminion's avatar
petitminion committed
    xspf_playlist.set("version", "1")
    title_xspf = SubElement(xspf_playlist, "title")
    title_xspf.text = xspf_title
    date_xspf = SubElement(xspf_playlist, "date")
    date_xspf.text = xspf_date
    trackList_xspf = SubElement(xspf_playlist, "trackList")
petitminion's avatar
petitminion committed
    return trackList_xspf


def write_xspf_track_data(track, trackList_xspf):
    """
        Insert a track into the trackList subelement of a xspf file
    """
    track_xspf = SubElement(trackList_xspf, "track")
    location_xspf = SubElement(track_xspf, "location")
    location_xspf.text = "https://" + track.domain_name + track.listen_url
    title_xspf = SubElement(track_xspf, "title")
    title_xspf.text = str(track.title)
    creator_xspf = SubElement(track_xspf, "creator")
    creator_xspf.text = str(track.artist)
    if str(track.album) == "[non-album tracks]":
        return
    else:
        album_xspf = SubElement(track_xspf, "album")
        album_xspf.text = str(track.album)
petitminion's avatar
petitminion committed


def prettify(elem):
    """
        Return a pretty-printed XML string for the Element.
    """
    rough_string = etree.tostring(elem, "utf-8")
    reparsed = minidom.parseString(rough_string)
    return reparsed.toprettyxml(indent="  ")