Skip to content
Snippets Groups Projects
models.py 12.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • import os
    import io
    import arrow
    import datetime
    import tempfile
    import shutil
    import markdown
    
    from django.conf import settings
    from django.db import models
    from django.contrib.staticfiles.templatetags.staticfiles import static
    from django.core.files.base import ContentFile
    from django.core.files import File
    from django.core.urlresolvers import reverse
    from django.utils import timezone
    from taggit.managers import TaggableManager
    from versatileimagefield.fields import VersatileImageField
    
    from funkwhale_api.taskapp import celery
    from funkwhale_api import downloader
    from funkwhale_api import musicbrainz
    from . import importers
    from . import lyrics as lyrics_utils
    
    
    class APIModelMixin(models.Model):
        mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True)
        api_includes = []
        creation_date = models.DateTimeField(default=timezone.now)
        import_hooks = []
        class Meta:
            abstract = True
            ordering = ['-creation_date']
    
        @classmethod
        def get_or_create_from_api(cls, mbid):
            try:
                return cls.objects.get(mbid=mbid), False
            except cls.DoesNotExist:
                return cls.create_from_api(id=mbid), True
    
        def get_api_data(self):
            return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[self.musicbrainz_model]
    
        @classmethod
        def create_from_api(cls, **kwargs):
            if kwargs.get('id'):
                raw_data = cls.api.get(id=kwargs['id'], includes=cls.api_includes)[cls.musicbrainz_model]
            else:
                raw_data = cls.api.search(**kwargs)['{0}-list'.format(cls.musicbrainz_model)][0]
            cleaned_data = cls.clean_musicbrainz_data(raw_data)
            return importers.load(cls, cleaned_data, raw_data, cls.import_hooks)
    
        @classmethod
        def clean_musicbrainz_data(cls, data):
            cleaned_data = {}
            mapping = importers.Mapping(cls.musicbrainz_mapping)
            for key, value in data.items():
                try:
                    cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value)
                    cleaned_data[cleaned_key] = cleaned_value
                except KeyError as e:
                    pass
            return cleaned_data
    
    class Artist(APIModelMixin):
        name = models.CharField(max_length=255)
    
        musicbrainz_model = 'artist'
        musicbrainz_mapping = {
            'mbid': {
                'musicbrainz_field_name': 'id'
            },
            'name': {
                'musicbrainz_field_name': 'name'
            }
        }
        api = musicbrainz.api.artists
    
        def __str__(self):
            return self.name
    
        @property
        def tags(self):
            t = []
            for album in self.albums.all():
                for tag in album.tags:
                    t.append(tag)
            return set(t)
    
    def import_artist(v):
        a = Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0]
        return a
    
    def parse_date(v):
        if len(v) == 4:
            return datetime.date(int(v), 1, 1)
        d = arrow.get(v).date()
        return d
    
    
    def import_tracks(instance, cleaned_data, raw_data):
        for track_data in raw_data['medium-list'][0]['track-list']:
            track_cleaned_data = Track.clean_musicbrainz_data(track_data['recording'])
            track_cleaned_data['album'] = instance
            track_cleaned_data['position'] = int(track_data['position'])
            track = importers.load(Track, track_cleaned_data, track_data, Track.import_hooks)
    
    class Album(APIModelMixin):
        title = models.CharField(max_length=255)
        artist = models.ForeignKey(Artist, related_name='albums')
        release_date = models.DateField(null=True)
        cover = VersatileImageField(upload_to='albums/covers/%Y/%m/%d', null=True, blank=True)
        TYPE_CHOICES = (
            ('album', 'Album'),
        )
        type = models.CharField(choices=TYPE_CHOICES, max_length=30, default='album')
    
        api_includes = ['artist-credits', 'recordings', 'media']
        api = musicbrainz.api.releases
        musicbrainz_model = 'release'
        musicbrainz_mapping = {
            'mbid': {
                'musicbrainz_field_name': 'id',
            },
            'position': {
                'musicbrainz_field_name': 'release-list',
                'converter': lambda v: int(v[0]['medium-list'][0]['position']),
            },
            'title': {
                'musicbrainz_field_name': 'title',
            },
            'release_date': {
                'musicbrainz_field_name': 'date',
                'converter': parse_date,
    
            },
            'type': {
                'musicbrainz_field_name': 'type',
                'converter': lambda v: v.lower(),
            },
            'artist': {
                'musicbrainz_field_name': 'artist-credit',
                'converter': import_artist,
            }
        }
    
        def get_image(self):
            image_data =  musicbrainz.api.images.get_front(str(self.mbid))
            f = ContentFile(image_data)
            self.cover.save('{0}.jpg'.format(self.mbid), f)
            return self.cover.file
    
        def __str__(self):
            return self.title
    
        @property
        def tags(self):
            t = []
            for track in self.tracks.all():
                for tag in track.tags.all():
                    t.append(tag)
            return set(t)
    
    def import_tags(instance, cleaned_data, raw_data):
        MINIMUM_COUNT = 2
        tags_to_add = []
        for tag_data in raw_data.get('tag-list', []):
            try:
                if int(tag_data['count']) < MINIMUM_COUNT:
                    continue
            except ValueError:
                continue
            tags_to_add.append(tag_data['name'])
        instance.tags.add(*tags_to_add)
    
    def import_album(v):
        a = Album.get_or_create_from_api(mbid=v[0]['id'])[0]
        return a
    
    
    def link_recordings(instance, cleaned_data, raw_data):
        tracks = [
            r['target']
            for r in raw_data['recording-relation-list']
        ]
        Track.objects.filter(mbid__in=tracks).update(work=instance)
    
    
    def import_lyrics(instance, cleaned_data, raw_data):
        try:
            url = [
                url_data
                for url_data in raw_data['url-relation-list']
                if url_data['type'] == 'lyrics'
            ][0]['target']
        except (IndexError, KeyError):
            return
        l, _ = Lyrics.objects.get_or_create(work=instance, url=url)
    
        return l
    
    
    class Work(APIModelMixin):
        language = models.CharField(max_length=20)
        nature = models.CharField(max_length=50)
        title = models.CharField(max_length=255)
    
        api = musicbrainz.api.works
        api_includes = ['url-rels', 'recording-rels']
        musicbrainz_model = 'work'
        musicbrainz_mapping = {
            'mbid': {
                'musicbrainz_field_name': 'id'
            },
            'title': {
                'musicbrainz_field_name': 'title'
            },
            'language': {
                'musicbrainz_field_name': 'language',
            },
            'nature': {
                'musicbrainz_field_name': 'type',
                'converter': lambda v: v.lower(),
            },
        }
        import_hooks = [
            import_lyrics,
            link_recordings
        ]
    
        def fetch_lyrics(self):
            l = self.lyrics.first()
            if l:
                return l
            data = self.api.get(self.mbid, includes=['url-rels'])['work']
            l = import_lyrics(self, {}, data)
    
            return l
    
    
    class Lyrics(models.Model):
        work = models.ForeignKey(Work, related_name='lyrics', null=True, blank=True)
        url = models.URLField(unique=True)
        content = models.TextField(null=True, blank=True)
    
        @celery.app.task(name='Lyrics.fetch_content', filter=celery.task_method)
        def fetch_content(self):
            html = lyrics_utils._get_html(self.url)
            content = lyrics_utils.extract_content(html)
            cleaned_content = lyrics_utils.clean_content(content)
            self.content = cleaned_content
            self.save()
    
        @property
        def content_rendered(self):
            return markdown.markdown(
                self.content,
                safe_mode=True,
                enable_attributes=False,
                extensions=['markdown.extensions.nl2br'])
    
    
    class Track(APIModelMixin):
        title = models.CharField(max_length=255)
        artist = models.ForeignKey(Artist, related_name='tracks')
        position = models.PositiveIntegerField(null=True, blank=True)
        album = models.ForeignKey(Album, related_name='tracks', null=True, blank=True)
        work = models.ForeignKey(Work, related_name='tracks', null=True, blank=True)
    
        musicbrainz_model = 'recording'
        api = musicbrainz.api.recordings
        api_includes = ['artist-credits', 'releases', 'media', 'tags', 'work-rels']
        musicbrainz_mapping = {
            'mbid': {
                'musicbrainz_field_name': 'id'
            },
            'title': {
                'musicbrainz_field_name': 'title'
            },
            'artist': {
                'musicbrainz_field_name': 'artist-credit',
                'converter': lambda v: Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0],
            },
            'album': {
                'musicbrainz_field_name': 'release-list',
                'converter': import_album,
            },
        }
        import_hooks = [
            import_tags
        ]
        tags = TaggableManager()
    
        def __str__(self):
            return self.title
    
        def save(self, **kwargs):
            try:
                self.artist
            except  Artist.DoesNotExist:
                self.artist = self.album.artist
            super().save(**kwargs)
    
        def get_work(self):
            if self.work:
                return self.work
            data = self.api.get(self.mbid, includes=['work-rels'])
            try:
                work_data = data['recording']['work-relation-list'][0]['work']
            except (IndexError, KeyError):
                return
            work, _ = Work.get_or_create_from_api(mbid=work_data['id'])
            return work
    
        def get_lyrics_url(self):
    
            return reverse('api:v1:tracks-lyrics', kwargs={'pk': self.pk})
    
    
        @property
        def full_name(self):
            try:
                return '{} - {} - {}'.format(
                    self.artist.name,
                    self.album.title,
                    self.title,
                )
            except AttributeError:
                return '{} - {}'.format(
                    self.artist.name,
                    self.title,
                )
    
    
    class TrackFile(models.Model):
        track = models.ForeignKey(Track, related_name='files')
        audio_file = models.FileField(upload_to='tracks/%Y/%m/%d', max_length=255)
        source = models.URLField(null=True, blank=True)
        duration = models.IntegerField(null=True, blank=True)
    
        def download_file(self):
            # import the track file, since there is not any
            # we create a tmp dir for the download
            tmp_dir = tempfile.mkdtemp()
            data = downloader.download(
                self.source,
                target_directory=tmp_dir)
            self.duration = data.get('duration', None)
            self.audio_file.save(
                os.path.basename(data['audio_file_path']),
                File(open(data['audio_file_path'], 'rb'))
            )
            shutil.rmtree(tmp_dir)
            return self.audio_file
    
        @property
        def path(self):
            if settings.USE_SAMPLE_TRACK:
                return static('music/sample1.ogg')
            return self.audio_file.url
    
    class ImportBatch(models.Model):
        creation_date = models.DateTimeField(default=timezone.now)
        submitted_by = models.ForeignKey('users.User', related_name='imports')
    
        class Meta:
            ordering = ['-creation_date']
    
        def __str__(self):
            return str(self.pk)
    
        @property
        def status(self):
            pending = any([job.status == 'pending' for job in self.jobs.all()])
            if pending:
                return 'pending'
            return 'finished'
    
    class ImportJob(models.Model):
        batch = models.ForeignKey(ImportBatch, related_name='jobs')
        source = models.URLField()
        mbid = models.UUIDField(editable=False)
        STATUS_CHOICES = (
            ('pending', 'Pending'),
            ('finished', 'finished'),
        )
        status = models.CharField(choices=STATUS_CHOICES, default='pending', max_length=30)
    
        @celery.app.task(name='ImportJob.run', filter=celery.task_method)
        def run(self, replace=False):
            try:
                track, created = Track.get_or_create_from_api(mbid=self.mbid)
                track_file = None
                if replace:
                    track_file = track.files.first()
                elif track.files.count() > 0:
                    return
    
                track_file = track_file or TrackFile(track=track, source=self.source)
                track_file.download_file()
                track_file.save()
                self.status = 'finished'
                self.save()
                return track.pk
    
            except Exception as exc:
                if not settings.DEBUG:
                    raise ImportJob.run.retry(args=[self], exc=exc, countdown=30, max_retries=3)
                raise