Skip to content
Snippets Groups Projects
import_files.py 26.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • import collections
    import datetime
    
    import itertools
    
    import queue
    import threading
    
    import time
    
    import urllib.parse
    
    import watchdog.events
    import watchdog.observers
    
    from django.conf import settings
    
    from django.core.files import File
    
    from django.core.management import call_command
    
    from django.core.management.base import BaseCommand, CommandError
    
    from django.db.models import Q
    
    from django.utils import timezone
    
    from rest_framework import serializers
    
    from funkwhale_api.common import utils as common_utils
    
    from funkwhale_api.music import models, tasks, utils
    
    
    
    def crawl_dir(dir, extensions, recursive=True, ignored=[]):
    
        if os.path.isfile(dir):
            yield dir
            return
    
        with os.scandir(dir) as scanner:
            for entry in scanner:
                if entry.is_file():
                    for e in extensions:
                        if entry.name.lower().endswith(".{}".format(e.lower())):
                            if entry.path not in ignored:
                                yield entry.path
                elif recursive and entry.is_dir():
                    yield from crawl_dir(
                        entry, extensions, recursive=recursive, ignored=ignored
                    )
    
    
    
    def batch(iterable, n=1):
        has_entries = True
        while has_entries:
            current = []
            for i in range(0, n):
                try:
                    current.append(next(iterable))
                except StopIteration:
                    has_entries = False
            yield current
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        help = "Import audio files mathinc given glob pattern"
    
            parser.add_argument(
                "library_id",
                type=str,
                help=(
                    "A local library identifier where the files should be imported. "
                    "You can use the full uuid such as e29c5be9-6da3-4d92-b40b-4970edd3ee4b "
                    "or only a small portion of it, starting from the beginning, such as "
                    "e29c5be9"
                ),
            )
    
            parser.add_argument("path", nargs="+", type=str)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--recursive",
                action="store_true",
                dest="recursive",
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                help="Will match the pattern recursively (including subdirectories)",
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--username",
                dest="username",
                help="The username of the user you want to be bound to the import",
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--async",
                action="store_true",
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                help="Will launch celery tasks for each file to import instead of doing it synchronously and block the CLI",
    
            parser.add_argument(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--exit",
                "-x",
                action="store_true",
                dest="exit_on_failure",
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                help="Use this flag to disable error catching",
    
            )
            parser.add_argument(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--in-place",
                "-i",
                action="store_true",
                dest="in_place",
    
                default=False,
                help=(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    "Import files without duplicating them into the media directory."
                    "For in-place import to work, the music files must be readable"
                    "by the web-server and funkwhale api and celeryworker processes."
                    "You may want to use this if you have a big music library to "
                    "import and not much disk space available."
                ),
    
            parser.add_argument(
    
                "--replace",
    
                action="store_true",
    
                dest="replace",
    
                default=False,
                help=(
                    "Use this flag to replace duplicates (tracks with same "
    
                    "musicbrainz mbid, or same artist, album and title) on import "
                    "with their newest version."
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            parser.add_argument(
                "--outbox",
                action="store_true",
                dest="outbox",
                default=False,
                help=(
                    "Use this flag to notify library followers of newly imported files. "
                    "You'll likely want to keep this disabled for CLI imports, especially if"
                    "you plan to import hundreds or thousands of files, as it will cause a lot "
                    "of overhead on your server and on servers you are federating with."
                ),
            )
    
            parser.add_argument(
                "--watch",
                action="store_true",
                dest="watch",
                default=False,
                help=(
                    "Start the command in watch mode. Instead of running a full import, "
                    "and exit, watch the given path and import new files, remove deleted "
                    "files, and update metadata corresponding to updated files."
                ),
            )
    
            parser.add_argument("-e", "--extension", nargs="+")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
            parser.add_argument(
                "--broadcast",
                action="store_true",
                dest="broadcast",
                default=False,
                help=(
                    "Use this flag to enable realtime updates about the import in the UI. "
                    "This causes some overhead, so it's disabled by default."
                ),
            )
    
            parser.add_argument(
                "--prune",
                action="store_true",
                dest="prune",
                default=False,
                help=(
                    "Once the import is completed, prune tracks, ablums and artists that aren't linked to any upload."
                ),
            )
    
    
            parser.add_argument(
                "--reference",
                action="store",
                dest="reference",
                default=None,
                help=(
                    "A custom reference for the import. Leave this empty to have a random "
                    "reference being generated for you."
                ),
            )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "--noinput",
                "--no-input",
                action="store_false",
                dest="interactive",
    
                help="Do NOT prompt the user for input of any kind.",
            )
    
    
            parser.add_argument(
                "--batch-size",
                "-s",
                dest="batch_size",
                default=1000,
                type=int,
                help="Size of each batch, only used when crawling large collections",
            )
    
        def handle(self, *args, **options):
    
            # handle relative directories
            options["path"] = [os.path.abspath(path) for path in options["path"]]
    
            self.is_confirmed = False
    
            try:
                library = models.Library.objects.select_related("actor__user").get(
                    uuid__startswith=options["library_id"]
                )
            except models.Library.DoesNotExist:
                raise CommandError("Invalid library id")
    
    
            if not library.actor.get_user():
    
                raise CommandError("Library {} is not a local library".format(library.uuid))
    
    
            if options["in_place"]:
                self.stdout.write(
                    "Checking imported paths against settings.MUSIC_DIRECTORY_PATH"
                )
    
                for import_path in options["path"]:
                    p = settings.MUSIC_DIRECTORY_PATH
                    if not p:
                        raise CommandError(
                            "Importing in-place requires setting the "
                            "MUSIC_DIRECTORY_PATH variable"
                        )
                    if p and not import_path.startswith(p):
                        raise CommandError(
    
                            "Importing in-place only works if importing "
    
                            "from {} (MUSIC_DIRECTORY_PATH), as this directory"
                            "needs to be accessible by the webserver."
                            "Culprit: {}".format(p, import_path)
                        )
    
            reference = options["reference"] or "cli-{}".format(timezone.now().isoformat())
    
    
            import_url = "{}://{}/library/{}/upload?{}"
    
            import_url = import_url.format(
                settings.FUNKWHALE_PROTOCOL,
                settings.FUNKWHALE_HOSTNAME,
                str(library.uuid),
                urllib.parse.urlencode([("import", reference)]),
            )
            self.stdout.write(
                "For details, please refer to import reference '{}' or URL {}".format(
                    reference, import_url
                )
            )
    
            extensions = options.get("extension") or utils.SUPPORTED_EXTENSIONS
            if options["watch"]:
                if len(options["path"]) > 1:
                    raise CommandError("Watch only work with a single directory")
    
                return self.setup_watcher(
                    extensions=extensions,
                    path=options["path"][0],
                    reference=reference,
                    library=library,
                    in_place=options["in_place"],
                    prune=options["prune"],
                    recursive=options["recursive"],
                    replace=options["replace"],
                    dispatch_outbox=options["outbox"],
                    broadcast=options["broadcast"],
                )
    
            update = True
            checked_paths = set()
            if options["in_place"] and update:
                self.stdout.write("Checking existing files for updates…")
                message = (
                    "Are you sure you want to do this?\n\n"
                    "Type 'yes' to continue, or 'no' to skip checking for updates in "
                    "already imported files: "
                )
                if options["interactive"] and input("".join(message)) != "yes":
                    pass
                else:
                    checked_paths = check_updates(
                        stdout=self.stdout,
                        paths=options["path"],
                        extensions=extensions,
                        library=library,
                        batch_size=options["batch_size"],
                    )
                    self.stdout.write("Existing files checked, moving on to next step!")
    
            crawler = itertools.chain(
                *[
                    crawl_dir(
                        p,
                        extensions=extensions,
                        recursive=options["recursive"],
                        ignored=checked_paths,
                    )
                    for p in options["path"]
                ]
            )
            errors = []
            total = 0
            start_time = time.time()
    
            batch_start = None
            batch_duration = None
    
            self.stdout.write("Starting import of new files…")
    
            for i, entries in enumerate(batch(crawler, options["batch_size"])):
                total += len(entries)
                batch_start = time.time()
                time_stats = ""
                if i > 0:
                    time_stats = " - running for {}s, previous batch took {}s".format(
                        int(time.time() - start_time), int(batch_duration),
                    )
                if entries:
                    self.stdout.write(
                        "Handling batch {} ({} items){}".format(
    
                            i + 1, len(entries), time_stats,
    
                        )
                    )
                    batch_errors = self.handle_batch(
                        library=library,
                        paths=entries,
                        batch=i + 1,
                        reference=reference,
                        options=options,
                    )
                    if batch_errors:
                        errors += batch_errors
    
                batch_duration = time.time() - batch_start
    
    
            message = "Successfully imported {} new tracks in {}s"
    
            if options["async_"]:
    
                message = "Successfully launched import for {} new tracks in {}s"
    
            self.stdout.write(
                message.format(total - len(errors), int(time.time() - start_time))
            )
            if len(errors) > 0:
                self.stderr.write("{} tracks could not be imported:".format(len(errors)))
    
                for path, error in errors:
                    self.stderr.write("- {}: {}".format(path, error))
    
            self.stdout.write(
                "For details, please refer to import reference '{}' or URL {}".format(
                    reference, import_url
                )
            )
    
    
            if options["prune"]:
                self.stdout.write(
                    "Pruning dangling tracks, albums and artists from library…"
                )
                prune()
    
    
        def handle_batch(self, library, paths, batch, reference, options):
    
            for m in paths:
    
                # In some situations, the path is encoded incorrectly on the filesystem
                # so we filter out faulty paths and display a warning to the user.
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                # see https://dev.funkwhale.audio/funkwhale/funkwhale/issues/138
    
                try:
                    m.encode("utf-8")
                    matching.append(m)
                except UnicodeEncodeError:
                    try:
                        previous = matching[-1]
                    except IndexError:
                        previous = None
                    self.stderr.write(
                        self.style.WARNING(
                            "[warning] Ignoring undecodable path. Previous ok file was {}".format(
                                previous
                            )
                        )
                    )
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                raise CommandError("No file matching pattern, aborting")
    
            if options["replace"]:
                filtered = {"initial": matching, "skipped": [], "new": matching}
    
                message = "  - {} files to be replaced"
    
                import_paths = matching
            else:
    
                filtered = self.filter_matching(matching, library)
    
                message = "  - {} files already found in database"
    
                import_paths = filtered["new"]
    
    
            self.stdout.write("  Import summary:")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            self.stdout.write(
    
                "  - {} files found matching this pattern: {}".format(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    len(matching), options["path"]
                )
            )
    
            self.stdout.write(message.format(len(filtered["skipped"])))
    
    
            self.stdout.write("  - {} new files".format(len(filtered["new"])))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
    
            if batch == 1:
                self.stdout.write(
                    "  Selected options: {}".format(
                        ", ".join(
                            ["in place" if options["in_place"] else "copy music files"]
                        )
                    )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                )
            if len(filtered["new"]) == 0:
    
                self.stdout.write("  Nothing new to import, exiting")
    
            if options["interactive"] and not self.is_confirmed:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    "Are you sure you want to do this?\n\n"
    
                    "Type 'yes' to continue, or 'no' to cancel: "
                )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                if input("".join(message)) != "yes":
    
                self.is_confirmed = True
    
            errors = self.do_import(
    
                import_paths,
                library=library,
                reference=reference,
                batch=batch,
                options=options,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            )
    
            return errors
    
        def filter_matching(self, matching, library):
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            sources = ["file://{}".format(p) for p in matching]
    
            # we skip reimport for path that are already found
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            # as a Upload.source
    
            existing = library.uploads.filter(source__in=sources, import_status="finished")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            existing = existing.values_list("source", flat=True)
            existing = set([p.replace("file://", "", 1) for p in existing])
    
            skipped = set(matching) & existing
            result = {
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                "initial": matching,
                "skipped": list(sorted(skipped)),
                "new": list(sorted(set(matching) - skipped)),
    
        def do_import(self, paths, library, reference, batch, options):
            message = "[batch {batch}] {i}/{total} Importing {path}..."
    
            if options["async_"]:
    
                message = "[batch {batch}] {i}/{total} Launching import for {path}..."
    
            # we create an upload binded to the library
            async_ = options["async_"]
    
            for i, path in list(enumerate(paths)):
    
                if options["verbosity"] > 1:
                    self.stdout.write(
                        message.format(batch=batch, path=path, i=i + 1, total=len(paths))
                    )
    
                    create_upload(
                        path=path,
                        reference=reference,
                        library=library,
                        async_=async_,
                        replace=options["replace"],
                        in_place=options["in_place"],
                        dispatch_outbox=options["outbox"],
                        broadcast=options["broadcast"],
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    if options["exit_on_failure"]:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    m = "Error while importing {}: {} {}".format(
                        path, e.__class__.__name__, e
                    )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                    errors.append((path, "{} {}".format(e.__class__.__name__, e)))
    
        def setup_watcher(self, path, extensions, recursive, **kwargs):
            watchdog_queue = queue.Queue()
            # Set up a worker thread to process database load
            worker = threading.Thread(
                target=process_load_queue(self.stdout, **kwargs), args=(watchdog_queue,),
            )
            worker.setDaemon(True)
            worker.start()
    
            # setup watchdog to monitor directory for trigger files
            patterns = ["*.{}".format(e) for e in extensions]
            event_handler = Watcher(
                stdout=self.stdout, queue=watchdog_queue, patterns=patterns,
            )
            observer = watchdog.observers.Observer()
            observer.schedule(event_handler, path, recursive=recursive)
            observer.start()
    
            try:
                while True:
                    self.stdout.write(
                        "Watching for changes at {}…".format(path), ending="\r"
                    )
                    time.sleep(10)
                    if kwargs["prune"] and GLOBAL["need_pruning"]:
                        self.stdout.write("Some files were deleted, pruning library…")
                        prune()
                        GLOBAL["need_pruning"] = False
            except KeyboardInterrupt:
                self.stdout.write("Exiting…")
                observer.stop()
    
            observer.join()
    
    
    GLOBAL = {"need_pruning": False}
    
    
    def prune():
        call_command(
            "prune_library",
            dry_run=False,
            prune_artists=True,
            prune_albums=True,
            prune_tracks=True,
        )
    
    
    def create_upload(
        path, reference, library, async_, replace, in_place, dispatch_outbox, broadcast,
    ):
        import_handler = tasks.process_upload.delay if async_ else tasks.process_upload
        upload = models.Upload(library=library, import_reference=reference)
        upload.source = "file://" + path
        upload.import_metadata = {
            "funkwhale": {
                "config": {
                    "replace": replace,
                    "dispatch_outbox": dispatch_outbox,
                    "broadcast": broadcast,
    
        }
        if not in_place:
            name = os.path.basename(path)
            with open(path, "rb") as f:
                upload.audio_file.save(name, File(f), save=False)
    
        upload.save()
    
        import_handler(upload_id=upload.pk)
    
    
    def process_load_queue(stdout, **kwargs):
        def inner(q):
            # we batch events, to avoid calling same methods multiple times if a file is modified
            # a lot in a really short time
            flush_delay = 2
            batched_events = collections.OrderedDict()
            while True:
                while True:
                    if not q.empty():
                        event = q.get()
                        batched_events[event["path"]] = event
                    else:
                        break
                for path, event in batched_events.copy().items():
                    if time.time() - event["time"] <= flush_delay:
                        continue
                    now = datetime.datetime.utcnow()
                    stdout.write(
                        "{} -- Processing {}:{}...\n".format(
                            now.strftime("%Y/%m/%d %H:%M:%S"), event["type"], event["path"]
                        )
                    )
                    del batched_events[path]
                    handle_event(event, stdout=stdout, **kwargs)
                time.sleep(1)
    
        return inner
    
    
    class Watcher(watchdog.events.PatternMatchingEventHandler):
        def __init__(self, stdout, queue, patterns):
            self.stdout = stdout
            self.queue = queue
            super().__init__(patterns=patterns)
    
        def enqueue(self, event):
            e = {
                "is_directory": event.is_directory,
                "type": event.event_type,
                "path": event.src_path,
                "src_path": event.src_path,
                "dest_path": getattr(event, "dest_path", None),
                "time": time.time(),
            }
            self.queue.put(e)
    
        def on_moved(self, event):
            self.enqueue(event)
    
        def on_created(self, event):
            self.enqueue(event)
    
        def on_deleted(self, event):
            self.enqueue(event)
    
        def on_modified(self, event):
            self.enqueue(event)
    
    
    def handle_event(event, stdout, **kwargs):
        handlers = {
            "modified": handle_modified,
            "created": handle_created,
            "moved": handle_moved,
            "deleted": handle_deleted,
        }
        handlers[event["type"]](event=event, stdout=stdout, **kwargs)
    
    
    def handle_modified(event, stdout, library, in_place, **kwargs):
        existing_candidates = library.uploads.filter(import_status="finished")
        with open(event["path"], "rb") as f:
            checksum = common_utils.get_file_hash(f)
    
        existing = existing_candidates.filter(checksum=checksum).first()
        if existing:
            # found an existing file with same checksum, nothing to do
            stdout.write("  File already imported and metadata is up-to-date")
            return
    
        to_update = None
        if in_place:
            source = "file://{}".format(event["path"])
            to_update = (
                existing_candidates.in_place()
                .filter(source=source)
                .select_related(
                    "track__attributed_to", "track__artist", "track__album__artist",
                )
                .first()
            )
            if to_update:
                if (
                    to_update.track.attributed_to
                    and to_update.track.attributed_to != library.actor
                ):
                    stdout.write(
                        "  Cannot update track metadata, track belongs to someone else".format(
                            to_update.pk
                        )
                    )
                    return
                else:
                    stdout.write(
                        "  Updating existing file #{} with new metadata…".format(
                            to_update.pk
                        )
                    )
                    audio_metadata = to_update.get_metadata()
                    try:
                        tasks.update_track_metadata(audio_metadata, to_update.track)
                    except serializers.ValidationError as e:
                        stdout.write("  Invalid metadata: {}".format(e))
                    else:
                        to_update.checksum = checksum
                        to_update.save(update_fields=["checksum"])
                    return
    
        stdout.write("  Launching import for new file")
        create_upload(
            path=event["path"],
            reference=kwargs["reference"],
            library=library,
            async_=False,
            replace=kwargs["replace"],
            in_place=in_place,
            dispatch_outbox=kwargs["dispatch_outbox"],
            broadcast=kwargs["broadcast"],
        )
    
    
    def handle_created(event, stdout, **kwargs):
        """
        Created is essentially an alias for modified, because for instance when copying a file in the watched directory,
        a created event will be fired on the initial touch, then many modified event (as the file is written).
        """
        return handle_modified(event, stdout, **kwargs)
    
    
    def handle_moved(event, stdout, library, in_place, **kwargs):
        if not in_place:
            return
    
        old_source = "file://{}".format(event["src_path"])
        new_source = "file://{}".format(event["dest_path"])
        existing_candidates = library.uploads.filter(import_status="finished")
        existing_candidates = existing_candidates.in_place().filter(source=old_source)
        existing = existing_candidates.first()
        if existing:
            stdout.write("  Updating path of existing file #{}".format(existing.pk))
            existing.source = new_source
            existing.save(update_fields=["source"])
    
    
    def handle_deleted(event, stdout, library, in_place, **kwargs):
        if not in_place:
            return
        source = "file://{}".format(event["path"])
        existing_candidates = library.uploads.filter(import_status="finished")
        existing_candidates = existing_candidates.in_place().filter(source=source)
        if existing_candidates.count():
            stdout.write("  Removing file from DB")
            existing_candidates.delete()
            GLOBAL["need_pruning"] = True
    
    
    def check_updates(stdout, library, extensions, paths, batch_size):
        existing = (
            library.uploads.in_place()
            .filter(import_status="finished")
            .exclude(checksum=None)
            .select_related("library", "track")
        )
        queries = []
        checked_paths = set()
        for path in paths:
            for ext in extensions:
                queries.append(
                    Q(source__startswith="file://{}".format(path))
                    & Q(source__endswith=".{}".format(ext))
                )
        query, remainder = queries[0], queries[1:]
        for q in remainder:
            query = q | query
        existing = existing.filter(query)
        total = existing.count()
        stdout.write("Found {} files to check in database!".format(total))
        uploads = existing.order_by("source")
        for i, rows in enumerate(batch(uploads.iterator(), batch_size)):
            stdout.write("Handling batch {} ({} items)".format(i + 1, len(rows),))
    
            for upload in rows:
    
                check_upload(stdout, upload)
                checked_paths.add(upload.source.replace("file://", "", 1))
    
        return checked_paths
    
    
    def check_upload(stdout, upload):
        try:
            audio_file = upload.get_audio_file()
        except FileNotFoundError:
            stdout.write(
                "  Removing file #{} missing from disk at {}".format(
                    upload.pk, upload.source
                )
            )
            return upload.delete()
    
        checksum = common_utils.get_file_hash(audio_file)
        if upload.checksum != checksum:
            stdout.write(
                "  File #{} at {} was modified, updating metadata…".format(
                    upload.pk, upload.source
                )
            )
            if upload.library.actor_id != upload.track.attributed_to_id:
                stdout.write(
                    "  Cannot update track metadata, track belongs to someone else".format(
                        upload.pk
                    )
                )
            else:
                track = models.Track.objects.select_related("artist", "album__artist").get(
                    pk=upload.track_id
                )
                try:
                    tasks.update_track_metadata(upload.get_metadata(), track)
                except serializers.ValidationError as e:
                    stdout.write("  Invalid metadata: {}".format(e))
                    return
                else:
                    upload.checksum = checksum
                    upload.save(update_fields=["checksum"])