Skip to content
Snippets Groups Projects
models.py 12.3 KiB
Newer Older
import os
import io
import arrow
import datetime
import tempfile
import shutil
import markdown

from django.conf import settings
from django.db import models
from django.core.files.base import ContentFile
from django.core.files import File
from django.core.urlresolvers import reverse
from django.utils import timezone
from taggit.managers import TaggableManager
from versatileimagefield.fields import VersatileImageField

from funkwhale_api.taskapp import celery
from funkwhale_api import downloader
from funkwhale_api import musicbrainz
from . import importers
from . import lyrics as lyrics_utils


class APIModelMixin(models.Model):
    mbid = models.UUIDField(unique=True, db_index=True, null=True, blank=True)
    api_includes = []
    creation_date = models.DateTimeField(default=timezone.now)
    import_hooks = []
    class Meta:
        abstract = True
        ordering = ['-creation_date']

    @classmethod
    def get_or_create_from_api(cls, mbid):
        try:
            return cls.objects.get(mbid=mbid), False
        except cls.DoesNotExist:
            return cls.create_from_api(id=mbid), True

    def get_api_data(self):
        return self.__class__.api.get(id=self.mbid, includes=self.api_includes)[self.musicbrainz_model]

    @classmethod
    def create_from_api(cls, **kwargs):
        if kwargs.get('id'):
            raw_data = cls.api.get(id=kwargs['id'], includes=cls.api_includes)[cls.musicbrainz_model]
        else:
            raw_data = cls.api.search(**kwargs)['{0}-list'.format(cls.musicbrainz_model)][0]
        cleaned_data = cls.clean_musicbrainz_data(raw_data)
        return importers.load(cls, cleaned_data, raw_data, cls.import_hooks)

    @classmethod
    def clean_musicbrainz_data(cls, data):
        cleaned_data = {}
        mapping = importers.Mapping(cls.musicbrainz_mapping)
        for key, value in data.items():
            try:
                cleaned_key, cleaned_value = mapping.from_musicbrainz(key, value)
                cleaned_data[cleaned_key] = cleaned_value
            except KeyError as e:
                pass
        return cleaned_data

class Artist(APIModelMixin):
    name = models.CharField(max_length=255)

    musicbrainz_model = 'artist'
    musicbrainz_mapping = {
        'mbid': {
            'musicbrainz_field_name': 'id'
        },
        'name': {
            'musicbrainz_field_name': 'name'
        }
    }
    api = musicbrainz.api.artists

    def __str__(self):
        return self.name

    @property
    def tags(self):
        t = []
        for album in self.albums.all():
            for tag in album.tags:
                t.append(tag)
        return set(t)

def import_artist(v):
    a = Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0]
    return a

def parse_date(v):
    if len(v) == 4:
        return datetime.date(int(v), 1, 1)
    d = arrow.get(v).date()
    return d


def import_tracks(instance, cleaned_data, raw_data):
    for track_data in raw_data['medium-list'][0]['track-list']:
        track_cleaned_data = Track.clean_musicbrainz_data(track_data['recording'])
        track_cleaned_data['album'] = instance
        track_cleaned_data['position'] = int(track_data['position'])
        track = importers.load(Track, track_cleaned_data, track_data, Track.import_hooks)

class Album(APIModelMixin):
    title = models.CharField(max_length=255)
    artist = models.ForeignKey(Artist, related_name='albums')
    release_date = models.DateField(null=True)
    cover = VersatileImageField(upload_to='albums/covers/%Y/%m/%d', null=True, blank=True)
    TYPE_CHOICES = (
        ('album', 'Album'),
    )
    type = models.CharField(choices=TYPE_CHOICES, max_length=30, default='album')

    api_includes = ['artist-credits', 'recordings', 'media']
    api = musicbrainz.api.releases
    musicbrainz_model = 'release'
    musicbrainz_mapping = {
        'mbid': {
            'musicbrainz_field_name': 'id',
        },
        'position': {
            'musicbrainz_field_name': 'release-list',
            'converter': lambda v: int(v[0]['medium-list'][0]['position']),
        },
        'title': {
            'musicbrainz_field_name': 'title',
        },
        'release_date': {
            'musicbrainz_field_name': 'date',
            'converter': parse_date,

        },
        'type': {
            'musicbrainz_field_name': 'type',
            'converter': lambda v: v.lower(),
        },
        'artist': {
            'musicbrainz_field_name': 'artist-credit',
            'converter': import_artist,
        }
    }

    def get_image(self):
        image_data =  musicbrainz.api.images.get_front(str(self.mbid))
        f = ContentFile(image_data)
        self.cover.save('{0}.jpg'.format(self.mbid), f)
        return self.cover.file

    def __str__(self):
        return self.title

    @property
    def tags(self):
        t = []
        for track in self.tracks.all():
            for tag in track.tags.all():
                t.append(tag)
        return set(t)

def import_tags(instance, cleaned_data, raw_data):
    MINIMUM_COUNT = 2
    tags_to_add = []
    for tag_data in raw_data.get('tag-list', []):
        try:
            if int(tag_data['count']) < MINIMUM_COUNT:
                continue
        except ValueError:
            continue
        tags_to_add.append(tag_data['name'])
    instance.tags.add(*tags_to_add)

def import_album(v):
    a = Album.get_or_create_from_api(mbid=v[0]['id'])[0]
    return a


def link_recordings(instance, cleaned_data, raw_data):
    tracks = [
        r['target']
        for r in raw_data['recording-relation-list']
    ]
    Track.objects.filter(mbid__in=tracks).update(work=instance)


def import_lyrics(instance, cleaned_data, raw_data):
    try:
        url = [
            url_data
            for url_data in raw_data['url-relation-list']
            if url_data['type'] == 'lyrics'
        ][0]['target']
    except (IndexError, KeyError):
        return
    l, _ = Lyrics.objects.get_or_create(work=instance, url=url)

    return l


class Work(APIModelMixin):
    language = models.CharField(max_length=20)
    nature = models.CharField(max_length=50)
    title = models.CharField(max_length=255)

    api = musicbrainz.api.works
    api_includes = ['url-rels', 'recording-rels']
    musicbrainz_model = 'work'
    musicbrainz_mapping = {
        'mbid': {
            'musicbrainz_field_name': 'id'
        },
        'title': {
            'musicbrainz_field_name': 'title'
        },
        'language': {
            'musicbrainz_field_name': 'language',
        },
        'nature': {
            'musicbrainz_field_name': 'type',
            'converter': lambda v: v.lower(),
        },
    }
    import_hooks = [
        import_lyrics,
        link_recordings
    ]

    def fetch_lyrics(self):
        l = self.lyrics.first()
        if l:
            return l
        data = self.api.get(self.mbid, includes=['url-rels'])['work']
        l = import_lyrics(self, {}, data)

        return l


class Lyrics(models.Model):
    work = models.ForeignKey(Work, related_name='lyrics', null=True, blank=True)
    url = models.URLField(unique=True)
    content = models.TextField(null=True, blank=True)

    @celery.app.task(name='Lyrics.fetch_content', filter=celery.task_method)
    def fetch_content(self):
        html = lyrics_utils._get_html(self.url)
        content = lyrics_utils.extract_content(html)
        cleaned_content = lyrics_utils.clean_content(content)
        self.content = cleaned_content
        self.save()

    @property
    def content_rendered(self):
        return markdown.markdown(
            self.content,
            safe_mode=True,
            enable_attributes=False,
            extensions=['markdown.extensions.nl2br'])


class Track(APIModelMixin):
    title = models.CharField(max_length=255)
    artist = models.ForeignKey(Artist, related_name='tracks')
    position = models.PositiveIntegerField(null=True, blank=True)
    album = models.ForeignKey(Album, related_name='tracks', null=True, blank=True)
    work = models.ForeignKey(Work, related_name='tracks', null=True, blank=True)

    musicbrainz_model = 'recording'
    api = musicbrainz.api.recordings
    api_includes = ['artist-credits', 'releases', 'media', 'tags', 'work-rels']
    musicbrainz_mapping = {
        'mbid': {
            'musicbrainz_field_name': 'id'
        },
        'title': {
            'musicbrainz_field_name': 'title'
        },
        'artist': {
            'musicbrainz_field_name': 'artist-credit',
            'converter': lambda v: Artist.get_or_create_from_api(mbid=v[0]['artist']['id'])[0],
        },
        'album': {
            'musicbrainz_field_name': 'release-list',
            'converter': import_album,
        },
    }
    import_hooks = [
        import_tags
    ]
    tags = TaggableManager()

    def __str__(self):
        return self.title

    def save(self, **kwargs):
        try:
            self.artist
        except  Artist.DoesNotExist:
            self.artist = self.album.artist
        super().save(**kwargs)

    def get_work(self):
        if self.work:
            return self.work
        data = self.api.get(self.mbid, includes=['work-rels'])
        try:
            work_data = data['recording']['work-relation-list'][0]['work']
        except (IndexError, KeyError):
            return
        work, _ = Work.get_or_create_from_api(mbid=work_data['id'])
        return work

    def get_lyrics_url(self):
        return reverse('api:v1:tracks-lyrics', kwargs={'pk': self.pk})

    @property
    def full_name(self):
        try:
            return '{} - {} - {}'.format(
                self.artist.name,
                self.album.title,
                self.title,
            )
        except AttributeError:
            return '{} - {}'.format(
                self.artist.name,
                self.title,
            )


class TrackFile(models.Model):
    track = models.ForeignKey(Track, related_name='files')
    audio_file = models.FileField(upload_to='tracks/%Y/%m/%d', max_length=255)
    source = models.URLField(null=True, blank=True)
    duration = models.IntegerField(null=True, blank=True)

    def download_file(self):
        # import the track file, since there is not any
        # we create a tmp dir for the download
        tmp_dir = tempfile.mkdtemp()
        data = downloader.download(
            self.source,
            target_directory=tmp_dir)
        self.duration = data.get('duration', None)
        self.audio_file.save(
            os.path.basename(data['audio_file_path']),
            File(open(data['audio_file_path'], 'rb'))
        )
        shutil.rmtree(tmp_dir)
        return self.audio_file

    @property
    def path(self):
        if settings.PROTECT_AUDIO_FILES:
            return reverse(
                'api:v1:trackfiles-serve', kwargs={'pk': self.pk})
class ImportBatch(models.Model):
    creation_date = models.DateTimeField(default=timezone.now)
    submitted_by = models.ForeignKey('users.User', related_name='imports')

    class Meta:
        ordering = ['-creation_date']

    def __str__(self):
        return str(self.pk)

    @property
    def status(self):
        pending = any([job.status == 'pending' for job in self.jobs.all()])
        if pending:
            return 'pending'
        return 'finished'

class ImportJob(models.Model):
    batch = models.ForeignKey(ImportBatch, related_name='jobs')
    source = models.URLField()
    mbid = models.UUIDField(editable=False)
    STATUS_CHOICES = (
        ('pending', 'Pending'),
        ('finished', 'finished'),
    )
    status = models.CharField(choices=STATUS_CHOICES, default='pending', max_length=30)

    @celery.app.task(name='ImportJob.run', filter=celery.task_method)
    def run(self, replace=False):
        try:
            track, created = Track.get_or_create_from_api(mbid=self.mbid)
            track_file = None
            if replace:
                track_file = track.files.first()
            elif track.files.count() > 0:
                return

            track_file = track_file or TrackFile(track=track, source=self.source)
            track_file.download_file()
            track_file.save()
            self.status = 'finished'
            self.save()
            return track.pk

        except Exception as exc:
            if not settings.DEBUG:
                raise ImportJob.run.retry(args=[self], exc=exc, countdown=30, max_retries=3)
            raise