Newer
Older
Eliot Berriot
committed
import base64
Eliot Berriot
committed
import datetime
Eliot Berriot
committed
import logging
Eliot Berriot
committed
import pendulum
Eliot Berriot
committed
Eliot Berriot
committed
import mutagen._util
import mutagen.oggtheora
import mutagen.oggvorbis
import mutagen.flac
Eliot Berriot
committed
from rest_framework import serializers
from rest_framework.compat import Mapping
Eliot Berriot
committed
Eliot Berriot
committed
logger = logging.getLogger(__name__)
Eliot Berriot
committed
NODEFAULT = object()
class TagNotFound(KeyError):
pass
class UnsupportedTag(KeyError):
pass
Eliot Berriot
committed
class ParseError(ValueError):
pass
def get_id3_tag(f, k):
# First we try to grab the standard key
possible_attributes = [("text", True), ("url", False)]
for attr, select_first in possible_attributes:
try:
v = getattr(f.tags[k], attr)
if select_first:
v = v[0]
return v
except KeyError:
break
except AttributeError:
continue
# then we fallback on parsing non standard tags
try:
matches = [t for t in all_tags if t.desc.lower() == k.lower()]
return matches[0].text[0]
except (KeyError, IndexError):
raise TagNotFound(k)
def clean_id3_pictures(apic):
pictures = []
for p in list(apic):
pictures.append(
{
"mimetype": p.mime,
"content": p.data,
"description": p.desc,
"type": p.type.real,
}
)
return pictures
return f.pictures
return f.get(k, [])[0]
except (KeyError, IndexError):
raise TagNotFound(k)
def clean_flac_pictures(apic):
pictures = []
for p in list(apic):
pictures.append(
{
"mimetype": p.mime,
"content": p.data,
"description": p.desc,
"type": p.type.real,
}
)
return pictures
Eliot Berriot
committed
def clean_ogg_pictures(metadata_block_picture):
pictures = []
for b64_data in [metadata_block_picture]:
try:
data = base64.b64decode(b64_data)
except (TypeError, ValueError):
continue
try:
picture = mutagen.flac.Picture(data)
except mutagen.flac.FLACError:
continue
pictures.append(
{
"mimetype": picture.mime,
"content": picture.data,
"description": "",
"type": picture.type.real,
}
)
return pictures
def get_mp3_recording_id(f, k):
try:
return [t for t in f.tags.getall("UFID") if "musicbrainz.org" in t.owner][
0
].data.decode("utf-8")
except IndexError:
raise TagNotFound(k)
Eliot Berriot
committed
Eliot Berriot
committed
VALIDATION = {}
CONF = {
"OggOpus": {
"getter": lambda f, k: f[k][0],
"fields": {
Eliot Berriot
committed
"position": {"field": "TRACKNUMBER"},
"disc_number": {"field": "DISCNUMBER"},
Eliot Berriot
committed
"album_artist": {"field": "albumartist"},
Eliot Berriot
committed
"date": {"field": "date"},
"musicbrainz_albumid": {},
"musicbrainz_artistid": {},
Eliot Berriot
committed
"mbid": {"field": "musicbrainz_trackid"},
"OggVorbis": {
"getter": lambda f, k: f[k][0],
"fields": {
Eliot Berriot
committed
"position": {"field": "TRACKNUMBER"},
"disc_number": {"field": "DISCNUMBER"},
Eliot Berriot
committed
"album_artist": {"field": "albumartist"},
Eliot Berriot
committed
"date": {"field": "date"},
"musicbrainz_albumid": {},
"musicbrainz_artistid": {},
Eliot Berriot
committed
"mbid": {"field": "musicbrainz_trackid"},
Eliot Berriot
committed
"pictures": {
"field": "metadata_block_picture",
"to_application": clean_ogg_pictures,
},
"OggTheora": {
"getter": lambda f, k: f[k][0],
"fields": {
Eliot Berriot
committed
"position": {"field": "TRACKNUMBER"},
"disc_number": {"field": "DISCNUMBER"},
Eliot Berriot
committed
"date": {"field": "date"},
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
Eliot Berriot
committed
"mbid": {"field": "MusicBrainz Track Id"},
Eliot Berriot
committed
"license": {},
"copyright": {},
},
"MP3": {
"getter": get_id3_tag,
"clean_pictures": clean_id3_pictures,
"fields": {
Eliot Berriot
committed
"position": {"field": "TRCK"},
"disc_number": {"field": "TPOS"},
"title": {"field": "TIT2"},
"artist": {"field": "TPE1"},
Eliot Berriot
committed
"date": {"field": "TDRC"},
"musicbrainz_albumid": {"field": "MusicBrainz Album Id"},
"musicbrainz_artistid": {"field": "MusicBrainz Artist Id"},
"musicbrainz_albumartistid": {"field": "MusicBrainz Album Artist Id"},
Eliot Berriot
committed
"mbid": {"field": "UFID", "getter": get_mp3_recording_id},
"license": {"field": "WCOP"},
"copyright": {"field": "TCOP"},
"FLAC": {
"getter": get_flac_tag,
"clean_pictures": clean_flac_pictures,
"fields": {
Eliot Berriot
committed
"position": {"field": "tracknumber"},
"disc_number": {"field": "discnumber"},
Eliot Berriot
committed
"date": {"field": "date"},
"musicbrainz_albumid": {},
"musicbrainz_artistid": {},
Eliot Berriot
committed
"mbid": {"field": "musicbrainz_trackid"},
}
Eliot Berriot
committed
"position",
"disc_number",
"title",
"artist",
"album_artist",
"album",
"date",
"musicbrainz_albumid",
"musicbrainz_artistid",
"musicbrainz_albumartistid",
Eliot Berriot
committed
"mbid",
Eliot Berriot
committed
class Metadata(Mapping):
Eliot Berriot
committed
def __init__(self, filething, kind=mutagen.File):
self._file = kind(filething)
if self._file is None:
Eliot Berriot
committed
raise ValueError("Cannot parse metadata from {}".format(filething))
self.fallback = self.load_fallback(filething, self._file)
ft = self.get_file_type(self._file)
try:
self._conf = CONF[ft]
except KeyError:
Eliot Berriot
committed
def get_file_type(self, f):
return f.__class__.__name__
Eliot Berriot
committed
def load_fallback(self, filething, parent):
"""
In some situations, such as Ogg Theora files tagged with MusicBrainz Picard,
part of the tags are only available in the ogg vorbis comments
"""
try:
filething.seek(0)
except AttributeError:
pass
if isinstance(parent, mutagen.oggtheora.OggTheora):
try:
return Metadata(filething, kind=mutagen.oggvorbis.OggVorbis)
except (ValueError, mutagen._util.MutagenError):
raise
pass
def get(self, key, default=NODEFAULT):
Eliot Berriot
committed
try:
return self._get_from_self(key)
except TagNotFound:
if not self.fallback:
if default != NODEFAULT:
return default
else:
raise
else:
return self.fallback.get(key, default=default)
except UnsupportedTag:
if not self.fallback:
raise
else:
return self.fallback.get(key, default=default)
Eliot Berriot
committed
def all(self):
"""
Return a dict with all support metadata fields, if they are available
"""
final = {}
for field in self._conf["fields"]:
if field in ["pictures"]:
continue
value = self.get(field, None)
if value is None:
continue
final[field] = str(value)
return final
Eliot Berriot
committed
def _get_from_self(self, key, default=NODEFAULT):
try:
except KeyError:
raise UnsupportedTag("{} is not supported for this file format".format(key))
real_key = field_conf.get("field", key)
Eliot Berriot
committed
try:
getter = field_conf.get("getter", self._conf["getter"])
v = getter(self._file, real_key)
Eliot Berriot
committed
except KeyError:
if default == NODEFAULT:
raise TagNotFound(real_key)
Eliot Berriot
committed
return default
if converter:
v = converter(v)
field = VALIDATION.get(key)
if field:
v = field.to_python(v)
Eliot Berriot
committed
return v
Eliot Berriot
committed
def get_picture(self, *picture_types):
if not picture_types:
raise ValueError("You need to request at least one picture type")
ptypes = [
getattr(mutagen.id3.PictureType, picture_type.upper())
for picture_type in picture_types
]
try:
except (UnsupportedTag, TagNotFound):
return
cleaner = self._conf.get("clean_pictures", lambda v: v)
pictures = cleaner(pictures)
Eliot Berriot
committed
if not pictures:
return
for ptype in ptypes:
for p in pictures:
if p["type"] == ptype:
return p
Eliot Berriot
committed
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def __getitem__(self, key):
return self.get(key)
def __len__(self):
return 1
def __iter__(self):
for field in self._conf["fields"]:
yield field
class ArtistField(serializers.Field):
def __init__(self, *args, **kwargs):
self.for_album = kwargs.pop("for_album", False)
super().__init__(*args, **kwargs)
def get_value(self, data):
if self.for_album:
keys = [("names", "album_artist"), ("mbids", "musicbrainz_albumartistid")]
else:
keys = [("names", "artist"), ("mbids", "musicbrainz_artistid")]
final = {}
for field, key in keys:
final[field] = data.get(key, None)
return final
def to_internal_value(self, data):
# we have multiple values that can be separated by various separators
separators = [";", "/"]
# we get a list like that if tagged via musicbrainz
# ae29aae4-abfb-4609-8f54-417b1f4d64cc; 3237b5a8-ae44-400c-aa6d-cea51f0b9074;
raw_mbids = data["mbids"]
used_separator = None
mbids = [raw_mbids]
if raw_mbids:
for separator in separators:
if separator in raw_mbids:
used_separator = separator
mbids = [m.strip() for m in raw_mbids.split(separator)]
break
# now, we split on artist names, using the same separator as the one used
# by mbids, if any
if used_separator and mbids:
names = [n.strip() for n in data["names"].split(used_separator)]
else:
names = [data["names"]]
final = []
for i, name in enumerate(names):
try:
mbid = mbids[i]
except IndexError:
mbid = None
artist = {"name": name, "mbid": mbid}
final.append(artist)
field = serializers.ListField(child=ArtistSerializer(), min_length=1)
return field.to_internal_value(final)
class AlbumField(serializers.Field):
def get_value(self, data):
return data
def to_internal_value(self, data):
try:
title = data.get("album")
except TagNotFound:
raise serializers.ValidationError("Missing album tag")
final = {
"title": title,
"release_date": data.get("date", None),
"mbid": data.get("musicbrainz_albumid", None),
}
artists_field = ArtistField(for_album=True)
payload = artists_field.get_value(data)
artists = artists_field.to_internal_value(payload)
album_serializer = AlbumSerializer(data=final)
album_serializer.is_valid(raise_exception=True)
album_serializer.validated_data["artists"] = artists
return album_serializer.validated_data
class CoverDataField(serializers.Field):
def get_value(self, data):
return data
def to_internal_value(self, data):
return data.get_picture("cover_front", "other")
class PermissiveDateField(serializers.CharField):
def to_internal_value(self, value):
if not value:
return None
value = super().to_internal_value(str(value))
ADDITIONAL_FORMATS = [
"%Y-%d-%m %H:%M", # deezer date format
"%Y-%W", # weird date format based on week number, see #718
]
for date_format in ADDITIONAL_FORMATS:
try:
parsed = datetime.datetime.strptime(value, date_format)
except ValueError:
continue
else:
return datetime.date(parsed.year, parsed.month, parsed.day)
try:
parsed = pendulum.parse(str(value))
return datetime.date(parsed.year, parsed.month, parsed.day)
except pendulum.exceptions.ParserError:
pass
return None
class ArtistSerializer(serializers.Serializer):
name = serializers.CharField()
mbid = serializers.UUIDField(required=False, allow_null=True)
class AlbumSerializer(serializers.Serializer):
title = serializers.CharField()
mbid = serializers.UUIDField(required=False, allow_null=True)
release_date = PermissiveDateField(required=False, allow_null=True)
class PositionField(serializers.CharField):
def to_internal_value(self, v):
v = super().to_internal_value(v)
if not v:
return v
try:
return int(v)
except ValueError:
# maybe the position is of the form "1/4"
pass
try:
return int(v.split("/")[0])
except (ValueError, AttributeError, IndexError):
pass
class TrackMetadataSerializer(serializers.Serializer):
title = serializers.CharField()
position = PositionField(allow_null=True, required=False)
disc_number = PositionField(allow_null=True, required=False)
copyright = serializers.CharField(allow_null=True, required=False)
license = serializers.CharField(allow_null=True, required=False)
mbid = serializers.UUIDField(allow_null=True, required=False)
album = AlbumField()
artists = ArtistField()
cover_data = CoverDataField()