Newer
Older
import collections
import datetime
import queue
import threading
import urllib.parse
import watchdog.events
import watchdog.observers
from django.core.files import File
from django.core.management import call_command
Eliot Berriot
committed
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
from django.utils import timezone
from rest_framework import serializers
from funkwhale_api.common import utils as common_utils
from funkwhale_api.music import models, tasks, utils
def crawl_dir(dir, extensions, recursive=True, ignored=[]):
if os.path.isfile(dir):
yield dir
return
with os.scandir(dir) as scanner:
for entry in scanner:
if entry.is_file():
for e in extensions:
if entry.name.lower().endswith(".{}".format(e.lower())):
if entry.path not in ignored:
yield entry.path
elif recursive and entry.is_dir():
yield from crawl_dir(
entry, extensions, recursive=recursive, ignored=ignored
)
def batch(iterable, n=1):
has_entries = True
while has_entries:
current = []
for i in range(0, n):
try:
current.append(next(iterable))
except StopIteration:
has_entries = False
yield current
Eliot Berriot
committed
class Command(BaseCommand):
Eliot Berriot
committed
def add_arguments(self, parser):
parser.add_argument(
"library_id",
type=str,
help=(
"A local library identifier where the files should be imported. "
"You can use the full uuid such as e29c5be9-6da3-4d92-b40b-4970edd3ee4b "
"or only a small portion of it, starting from the beginning, such as "
"e29c5be9"
),
)
parser.add_argument("path", nargs="+", type=str)
Eliot Berriot
committed
parser.add_argument(
"--recursive",
action="store_true",
dest="recursive",
Eliot Berriot
committed
default=False,
help="Will match the pattern recursively (including subdirectories)",
Eliot Berriot
committed
)
parser.add_argument(
"--username",
dest="username",
help="The username of the user you want to be bound to the import",
Eliot Berriot
committed
parser.add_argument(
Eliot Berriot
committed
default=False,
help="Will launch celery tasks for each file to import instead of doing it synchronously and block the CLI",
Eliot Berriot
committed
)
"--exit",
"-x",
action="store_true",
dest="exit_on_failure",
"--in-place",
"-i",
action="store_true",
dest="in_place",
"Import files without duplicating them into the media directory."
"For in-place import to work, the music files must be readable"
"by the web-server and funkwhale api and celeryworker processes."
"You may want to use this if you have a big music library to "
"import and not much disk space available."
),
default=False,
help=(
"Use this flag to replace duplicates (tracks with same "
"musicbrainz mbid, or same artist, album and title) on import "
"with their newest version."
parser.add_argument(
"--outbox",
action="store_true",
dest="outbox",
default=False,
help=(
"Use this flag to notify library followers of newly imported files. "
"You'll likely want to keep this disabled for CLI imports, especially if"
"you plan to import hundreds or thousands of files, as it will cause a lot "
"of overhead on your server and on servers you are federating with."
),
)
parser.add_argument(
"--watch",
action="store_true",
dest="watch",
default=False,
help=(
"Start the command in watch mode. Instead of running a full import, "
"and exit, watch the given path and import new files, remove deleted "
"files, and update metadata corresponding to updated files."
),
)
parser.add_argument("-e", "--extension", nargs="+")
parser.add_argument(
"--broadcast",
action="store_true",
dest="broadcast",
default=False,
help=(
"Use this flag to enable realtime updates about the import in the UI. "
"This causes some overhead, so it's disabled by default."
),
)
parser.add_argument(
"--prune",
action="store_true",
dest="prune",
default=False,
help=(
"Once the import is completed, prune tracks, ablums and artists that aren't linked to any upload."
),
)
parser.add_argument(
"--reference",
action="store",
dest="reference",
default=None,
help=(
"A custom reference for the import. Leave this empty to have a random "
"reference being generated for you."
),
)
Eliot Berriot
committed
parser.add_argument(
"--noinput",
"--no-input",
action="store_false",
dest="interactive",
Eliot Berriot
committed
help="Do NOT prompt the user for input of any kind.",
)
parser.add_argument(
"--batch-size",
"-s",
dest="batch_size",
default=1000,
type=int,
help="Size of each batch, only used when crawling large collections",
)
# handle relative directories
options["path"] = [os.path.abspath(path) for path in options["path"]]
try:
library = models.Library.objects.select_related("actor__user").get(
uuid__startswith=options["library_id"]
)
except models.Library.DoesNotExist:
raise CommandError("Invalid library id")
raise CommandError("Library {} is not a local library".format(library.uuid))
if options["in_place"]:
self.stdout.write(
"Checking imported paths against settings.MUSIC_DIRECTORY_PATH"
)
for import_path in options["path"]:
p = settings.MUSIC_DIRECTORY_PATH
if not p:
raise CommandError(
"Importing in-place requires setting the "
"MUSIC_DIRECTORY_PATH variable"
)
if p and not import_path.startswith(p):
raise CommandError(
"Importing in-place only works if importing "
"from {} (MUSIC_DIRECTORY_PATH), as this directory"
"needs to be accessible by the webserver."
"Culprit: {}".format(p, import_path)
)
reference = options["reference"] or "cli-{}".format(timezone.now().isoformat())
import_url = "{}://{}/library/{}/upload?{}"
import_url = import_url.format(
settings.FUNKWHALE_PROTOCOL,
settings.FUNKWHALE_HOSTNAME,
str(library.uuid),
urllib.parse.urlencode([("import", reference)]),
)
self.stdout.write(
"For details, please refer to import reference '{}' or URL {}".format(
reference, import_url
)
)
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
extensions = options.get("extension") or utils.SUPPORTED_EXTENSIONS
if options["watch"]:
if len(options["path"]) > 1:
raise CommandError("Watch only work with a single directory")
return self.setup_watcher(
extensions=extensions,
path=options["path"][0],
reference=reference,
library=library,
in_place=options["in_place"],
prune=options["prune"],
recursive=options["recursive"],
replace=options["replace"],
dispatch_outbox=options["outbox"],
broadcast=options["broadcast"],
)
update = True
checked_paths = set()
if options["in_place"] and update:
self.stdout.write("Checking existing files for updates…")
message = (
"Are you sure you want to do this?\n\n"
"Type 'yes' to continue, or 'no' to skip checking for updates in "
"already imported files: "
)
if options["interactive"] and input("".join(message)) != "yes":
pass
else:
checked_paths = check_updates(
stdout=self.stdout,
paths=options["path"],
extensions=extensions,
library=library,
batch_size=options["batch_size"],
)
self.stdout.write("Existing files checked, moving on to next step!")
crawler = itertools.chain(
*[
crawl_dir(
p,
extensions=extensions,
recursive=options["recursive"],
ignored=checked_paths,
)
for p in options["path"]
]
)
errors = []
total = 0
start_time = time.time()
batch_start = None
batch_duration = None
self.stdout.write("Starting import of new files…")
for i, entries in enumerate(batch(crawler, options["batch_size"])):
total += len(entries)
batch_start = time.time()
time_stats = ""
if i > 0:
time_stats = " - running for {}s, previous batch took {}s".format(
int(time.time() - start_time), int(batch_duration),
)
if entries:
self.stdout.write(
"Handling batch {} ({} items){}".format(
i + 1, len(entries), time_stats,
)
)
batch_errors = self.handle_batch(
library=library,
paths=entries,
batch=i + 1,
reference=reference,
options=options,
)
if batch_errors:
errors += batch_errors
batch_duration = time.time() - batch_start
message = "Successfully imported {} new tracks in {}s"
message = "Successfully launched import for {} new tracks in {}s"
self.stdout.write(
message.format(total - len(errors), int(time.time() - start_time))
)
if len(errors) > 0:
self.stderr.write("{} tracks could not be imported:".format(len(errors)))
for path, error in errors:
self.stderr.write("- {}: {}".format(path, error))
self.stdout.write(
"For details, please refer to import reference '{}' or URL {}".format(
reference, import_url
)
)
if options["prune"]:
self.stdout.write(
"Pruning dangling tracks, albums and artists from library…"
)
prune()
def handle_batch(self, library, paths, batch, reference, options):
Eliot Berriot
committed
matching = []
Eliot Berriot
committed
# In some situations, the path is encoded incorrectly on the filesystem
# so we filter out faulty paths and display a warning to the user.
# see https://dev.funkwhale.audio/funkwhale/funkwhale/issues/138
Eliot Berriot
committed
try:
m.encode("utf-8")
matching.append(m)
except UnicodeEncodeError:
try:
previous = matching[-1]
except IndexError:
previous = None
self.stderr.write(
self.style.WARNING(
"[warning] Ignoring undecodable path. Previous ok file was {}".format(
previous
)
)
)
Eliot Berriot
committed
if not matching:
raise CommandError("No file matching pattern, aborting")
Eliot Berriot
committed
if options["replace"]:
filtered = {"initial": matching, "skipped": [], "new": matching}
filtered = self.filter_matching(matching, library)
message = " - {} files already found in database"
" - {} files found matching this pattern: {}".format(
self.stdout.write(message.format(len(filtered["skipped"])))
self.stdout.write(" - {} new files".format(len(filtered["new"])))
if batch == 1:
self.stdout.write(
" Selected options: {}".format(
", ".join(
["in place" if options["in_place"] else "copy music files"]
)
)
self.stdout.write(" Nothing new to import, exiting")
return
if options["interactive"] and not self.is_confirmed:
Eliot Berriot
committed
message = (
Eliot Berriot
committed
"Type 'yes' to continue, or 'no' to cancel: "
)
Eliot Berriot
committed
raise CommandError("Import cancelled.")
Eliot Berriot
committed
import_paths,
library=library,
reference=reference,
batch=batch,
options=options,
def filter_matching(self, matching, library):
# we skip reimport for path that are already found
existing = library.uploads.filter(source__in=sources, import_status="finished")
existing = existing.values_list("source", flat=True)
existing = set([p.replace("file://", "", 1) for p in existing])
skipped = set(matching) & existing
result = {
"initial": matching,
"skipped": list(sorted(skipped)),
"new": list(sorted(set(matching) - skipped)),
}
return result
def do_import(self, paths, library, reference, batch, options):
message = "[batch {batch}] {i}/{total} Importing {path}..."
message = "[batch {batch}] {i}/{total} Launching import for {path}..."
Eliot Berriot
committed
# we create an upload binded to the library
async_ = options["async_"]
errors = []
for i, path in list(enumerate(paths)):
if options["verbosity"] > 1:
self.stdout.write(
message.format(batch=batch, path=path, i=i + 1, total=len(paths))
)
Eliot Berriot
committed
try:
create_upload(
path=path,
reference=reference,
library=library,
async_=async_,
replace=options["replace"],
in_place=options["in_place"],
dispatch_outbox=options["outbox"],
broadcast=options["broadcast"],
Eliot Berriot
committed
except Exception as e:
m = "Error while importing {}: {} {}".format(
path, e.__class__.__name__, e
)
errors.append((path, "{} {}".format(e.__class__.__name__, e)))
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def setup_watcher(self, path, extensions, recursive, **kwargs):
watchdog_queue = queue.Queue()
# Set up a worker thread to process database load
worker = threading.Thread(
target=process_load_queue(self.stdout, **kwargs), args=(watchdog_queue,),
)
worker.setDaemon(True)
worker.start()
# setup watchdog to monitor directory for trigger files
patterns = ["*.{}".format(e) for e in extensions]
event_handler = Watcher(
stdout=self.stdout, queue=watchdog_queue, patterns=patterns,
)
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path, recursive=recursive)
observer.start()
try:
while True:
self.stdout.write(
"Watching for changes at {}…".format(path), ending="\r"
)
time.sleep(10)
if kwargs["prune"] and GLOBAL["need_pruning"]:
self.stdout.write("Some files were deleted, pruning library…")
prune()
GLOBAL["need_pruning"] = False
except KeyboardInterrupt:
self.stdout.write("Exiting…")
observer.stop()
observer.join()
GLOBAL = {"need_pruning": False}
def prune():
call_command(
"prune_library",
dry_run=False,
prune_artists=True,
prune_albums=True,
prune_tracks=True,
)
def create_upload(
path, reference, library, async_, replace, in_place, dispatch_outbox, broadcast,
):
import_handler = tasks.process_upload.delay if async_ else tasks.process_upload
upload = models.Upload(library=library, import_reference=reference)
upload.source = "file://" + path
upload.import_metadata = {
"funkwhale": {
"config": {
"replace": replace,
"dispatch_outbox": dispatch_outbox,
"broadcast": broadcast,
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
}
if not in_place:
name = os.path.basename(path)
with open(path, "rb") as f:
upload.audio_file.save(name, File(f), save=False)
upload.save()
import_handler(upload_id=upload.pk)
def process_load_queue(stdout, **kwargs):
def inner(q):
# we batch events, to avoid calling same methods multiple times if a file is modified
# a lot in a really short time
flush_delay = 2
batched_events = collections.OrderedDict()
while True:
while True:
if not q.empty():
event = q.get()
batched_events[event["path"]] = event
else:
break
for path, event in batched_events.copy().items():
if time.time() - event["time"] <= flush_delay:
continue
now = datetime.datetime.utcnow()
stdout.write(
"{} -- Processing {}:{}...\n".format(
now.strftime("%Y/%m/%d %H:%M:%S"), event["type"], event["path"]
)
)
del batched_events[path]
handle_event(event, stdout=stdout, **kwargs)
time.sleep(1)
return inner
class Watcher(watchdog.events.PatternMatchingEventHandler):
def __init__(self, stdout, queue, patterns):
self.stdout = stdout
self.queue = queue
super().__init__(patterns=patterns)
def enqueue(self, event):
e = {
"is_directory": event.is_directory,
"type": event.event_type,
"path": event.src_path,
"src_path": event.src_path,
"dest_path": getattr(event, "dest_path", None),
"time": time.time(),
}
self.queue.put(e)
def on_moved(self, event):
self.enqueue(event)
def on_created(self, event):
self.enqueue(event)
def on_deleted(self, event):
self.enqueue(event)
def on_modified(self, event):
self.enqueue(event)
def handle_event(event, stdout, **kwargs):
handlers = {
"modified": handle_modified,
"created": handle_created,
"moved": handle_moved,
"deleted": handle_deleted,
}
handlers[event["type"]](event=event, stdout=stdout, **kwargs)
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
def handle_modified(event, stdout, library, in_place, **kwargs):
existing_candidates = library.uploads.filter(import_status="finished")
with open(event["path"], "rb") as f:
checksum = common_utils.get_file_hash(f)
existing = existing_candidates.filter(checksum=checksum).first()
if existing:
# found an existing file with same checksum, nothing to do
stdout.write(" File already imported and metadata is up-to-date")
return
to_update = None
if in_place:
source = "file://{}".format(event["path"])
to_update = (
existing_candidates.in_place()
.filter(source=source)
.select_related(
"track__attributed_to", "track__artist", "track__album__artist",
)
.first()
)
if to_update:
if (
to_update.track.attributed_to
and to_update.track.attributed_to != library.actor
):
stdout.write(
" Cannot update track metadata, track belongs to someone else".format(
to_update.pk
)
)
return
else:
stdout.write(
" Updating existing file #{} with new metadata…".format(
to_update.pk
)
)
audio_metadata = to_update.get_metadata()
try:
tasks.update_track_metadata(audio_metadata, to_update.track)
except serializers.ValidationError as e:
stdout.write(" Invalid metadata: {}".format(e))
else:
to_update.checksum = checksum
to_update.save(update_fields=["checksum"])
return
stdout.write(" Launching import for new file")
create_upload(
path=event["path"],
reference=kwargs["reference"],
library=library,
async_=False,
replace=kwargs["replace"],
in_place=in_place,
dispatch_outbox=kwargs["dispatch_outbox"],
broadcast=kwargs["broadcast"],
)
def handle_created(event, stdout, **kwargs):
"""
Created is essentially an alias for modified, because for instance when copying a file in the watched directory,
a created event will be fired on the initial touch, then many modified event (as the file is written).
"""
return handle_modified(event, stdout, **kwargs)
def handle_moved(event, stdout, library, in_place, **kwargs):
if not in_place:
return
old_source = "file://{}".format(event["src_path"])
new_source = "file://{}".format(event["dest_path"])
existing_candidates = library.uploads.filter(import_status="finished")
existing_candidates = existing_candidates.in_place().filter(source=old_source)
existing = existing_candidates.first()
if existing:
stdout.write(" Updating path of existing file #{}".format(existing.pk))
existing.source = new_source
existing.save(update_fields=["source"])
def handle_deleted(event, stdout, library, in_place, **kwargs):
if not in_place:
return
source = "file://{}".format(event["path"])
existing_candidates = library.uploads.filter(import_status="finished")
existing_candidates = existing_candidates.in_place().filter(source=source)
if existing_candidates.count():
stdout.write(" Removing file from DB")
existing_candidates.delete()
GLOBAL["need_pruning"] = True
def check_updates(stdout, library, extensions, paths, batch_size):
existing = (
library.uploads.in_place()
.filter(import_status="finished")
.exclude(checksum=None)
.select_related("library", "track")
)
queries = []
checked_paths = set()
for path in paths:
for ext in extensions:
queries.append(
Q(source__startswith="file://{}".format(path))
& Q(source__endswith=".{}".format(ext))
)
query, remainder = queries[0], queries[1:]
for q in remainder:
query = q | query
existing = existing.filter(query)
total = existing.count()
stdout.write("Found {} files to check in database!".format(total))
uploads = existing.order_by("source")
for i, rows in enumerate(batch(uploads.iterator(), batch_size)):
stdout.write("Handling batch {} ({} items)".format(i + 1, len(rows),))
for upload in rows:
check_upload(stdout, upload)
checked_paths.add(upload.source.replace("file://", "", 1))
return checked_paths
def check_upload(stdout, upload):
try:
audio_file = upload.get_audio_file()
except FileNotFoundError:
stdout.write(
" Removing file #{} missing from disk at {}".format(
upload.pk, upload.source
)
)
return upload.delete()
checksum = common_utils.get_file_hash(audio_file)
if upload.checksum != checksum:
stdout.write(
" File #{} at {} was modified, updating metadata…".format(
upload.pk, upload.source
)
)
if upload.library.actor_id != upload.track.attributed_to_id:
stdout.write(
" Cannot update track metadata, track belongs to someone else".format(
upload.pk
)
)
else:
track = models.Track.objects.select_related("artist", "album__artist").get(
pk=upload.track_id
)
try:
tasks.update_track_metadata(upload.get_metadata(), track)
except serializers.ValidationError as e:
stdout.write(" Invalid metadata: {}".format(e))
return
else:
upload.checksum = checksum
upload.save(update_fields=["checksum"])