Skip to content
Snippets Groups Projects
utils.py 6.26 KiB
Newer Older
  • Learn to ignore specific revisions
  • petitminion's avatar
    petitminion committed
    from datetime import datetime
    
    petitminion's avatar
    petitminion committed
    
    
    # /!\ The next import have xml vulnerabilities but this shouldn't have security implication in funkwhale
    # since there are only used to generate xspf file.
    from xml.etree.ElementTree import Element, SubElement
    
    from defusedxml import ElementTree as etree
    from defusedxml import minidom
    
    petitminion's avatar
    petitminion committed
    from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
    
    
    from funkwhale_api.music.models import Album, Artist, Track
    
    logger = logging.getLogger(__name__)
    
    
    def clean_namespace_xspf(xspf_file):
        """
            This will delete any namaespace found in the xspf file. It will also delete any encoding info.
    
    petitminion's avatar
    petitminion committed
            This way xspf file will be compatible with our get_track_id_from_xspf function.
    
        """
        file = open(xspf_file)
        with file as f:
            xspf_str = f.read()
        xspf_data = re.sub('xmlns="http://xspf.org/ns/0/"', "", xspf_str)
        # This is needed because lxml error : "ValueError: Unicode strings with encoding declaration are
        # not supported. Please use bytes input or XML fragments without declaration."
        xspf_data = re.sub("'encoding='.'", "", xspf_data)
    
    petitminion's avatar
    petitminion committed
        return xspf_data
    
    petitminion's avatar
    petitminion committed
    def album_exist(track, artist_id):
        try:
            album = track.find(".//album").text
        except AttributeError as e:
            logger.info(
                f"Couldn't find the following attribute while parsing the xml : {e!r}. No album information."
            )
            return
    
        try:
            album_id = Album.objects.get(title=album, artist_id=artist_id)
        except Exception as e:
            logger.info(f"Error while quering database for album : {e!r}")
            return
        except MultipleObjectsReturned as e:
            album_id = Album.objects.filter(title=album, artist_id=artist_id).first
            return album_id
        return album_id
    
    
    
    def get_track_id_from_xspf(xspf_file):
        """
            Return a list of funkwhale tracks id from a xspf file. Tracks not found in database are ignored.
    
    petitminion's avatar
    petitminion committed
            Usefull to generate playlist from xspf files.
    
    petitminion's avatar
    petitminion committed
        xspf_data_clean = clean_namespace_xspf(xspf_file)
        tree = etree.fromstring(xspf_data_clean)
    
        tracks = tree.findall(".//track")
    
    petitminion's avatar
    petitminion committed
        added_track_count = 0
    
    
        for track in tracks:
            track_id = ""
            # Getting metadata of the xspf file
            try:
                artist = track.find(".//creator").text
                title = track.find(".//title").text
    
    petitminion's avatar
    petitminion committed
            except AttributeError as e:
    
    petitminion's avatar
    petitminion committed
                logger.info(
    
    petitminion's avatar
    petitminion committed
                    f"Couldn't find the following attribute while parsing the xml file for artist and title data : {e!r}. \
                    Switching to next track..."
    
    petitminion's avatar
    petitminion committed
                )
    
    petitminion's avatar
    petitminion committed
                continue
    
    petitminion's avatar
    petitminion committed
     
    
            # Finding track id in the db
            try:
                artist_id = Artist.objects.get(name=artist)
            except Exception as e:
    
    petitminion's avatar
    petitminion committed
                logger.info(f"Error while quering database : {e!r}. Switching to next track.")
                continue
            except MultipleObjectsReturned as e:
                artist_id = Artist.objects.filter(name=artist).first()
    
            album_id = album_exist(track, artist_id)
            if album_id:
                try:
                    track_id = Track.objects.get(
                        title=title, artist=artist_id.id, album=album_id.id
                    )
                except ObjectDoesNotExist as e :
                    logger.info(f"Couldn't find track in the database : {e!r}. Trying without album...")
                except MultipleObjectsReturned as e:
                    track_id = Track.objects.filter(
                        title=title, artist=artist_id.id, album=album_id.id
                    ).first()
    
            else:
    
    petitminion's avatar
    petitminion committed
                try:
                    track_id = Track.objects.get(title=title, artist=artist_id.id)
                except ObjectDoesNotExist as e:
                    logger.info(f"Couldn't find track in the database : {e!r}")
    
    petitminion's avatar
    petitminion committed
                    continue
                except MultipleObjectsReturned as e:
                    track_id = Track.objects.filter(title=title, artist=artist_id.id).first()
    
    
            if track_id:
                track_list.append(track_id.id)
    
    petitminion's avatar
    petitminion committed
                added_track_count = added_track_count + 1
    
    petitminion's avatar
    petitminion committed
            str(len(tracks))
    
            + " tracks where found in xspf file. "
    
    petitminion's avatar
    petitminion committed
            + str(added_track_count)
            + " are gonna be added to playlist."
    
    petitminion's avatar
    petitminion committed
    def generate_xspf_from_tracks_ids(tracks_ids):
        """
            This returns a string containing playlist data in xspf format. It's used for test purposes.
        """
    
    petitminion's avatar
    petitminion committed
        xspf_title = "An automated generated playlist"
        now = datetime.now()
    
    petitminion's avatar
    petitminion committed
        xspf_date = now.strftime("%m/%d/%Y")
        xspf_playlist = Element("playlist")
        xspf_tracklist = write_xspf_headers(xspf_playlist, xspf_title, xspf_date)
    
    petitminion's avatar
    petitminion committed
    
        for track_id in tracks_ids:
    
    petitminion's avatar
    petitminion committed
            try:
                track = Track.objects.get(id=track_id)
    
    petitminion's avatar
    petitminion committed
                write_xspf_track_data(track, xspf_tracklist)
    
    petitminion's avatar
    petitminion committed
            except ObjectDoesNotExist as e:
                logger.info(f"Error while quering database : {e!r}")
    
    petitminion's avatar
    petitminion committed
        return prettify(xspf_playlist)
    
    petitminion's avatar
    petitminion committed
    def write_xspf_headers(xspf_playlist, xspf_title, xspf_date):
    
    petitminion's avatar
    petitminion committed
        """
            This generate the playlist metadata and return a trackList subelement used to insert each track
            into the playlist
        """
    
    petitminion's avatar
    petitminion committed
        xspf_playlist.set("version", "1")
        title_xspf = SubElement(xspf_playlist, "title")
        title_xspf.text = xspf_title
        date_xspf = SubElement(xspf_playlist, "date")
        date_xspf.text = xspf_date
        trackList_xspf = SubElement(xspf_playlist, "trackList")
    
    petitminion's avatar
    petitminion committed
        return trackList_xspf
    
    
    def write_xspf_track_data(track, trackList_xspf):
        """
            Insert a track into the trackList subelement of a xspf file
        """
        track_xspf = SubElement(trackList_xspf, "track")
        location_xspf = SubElement(track_xspf, "location")
        location_xspf.text = "https://" + track.domain_name + track.listen_url
        title_xspf = SubElement(track_xspf, "title")
        title_xspf.text = str(track.title)
        creator_xspf = SubElement(track_xspf, "creator")
        creator_xspf.text = str(track.artist)
        if str(track.album) == "[non-album tracks]":
            return
        else:
            album_xspf = SubElement(track_xspf, "album")
            album_xspf.text = str(track.album)
    
    petitminion's avatar
    petitminion committed
    
    
    
    def prettify(elem):
        """
            Return a pretty-printed XML string for the Element.
        """
        rough_string = etree.tostring(elem, "utf-8")
        reparsed = minidom.parseString(rough_string)
        return reparsed.toprettyxml(indent="  ")