Skip to content
Snippets Groups Projects
test_import_audio_file.py 5.66 KiB
Newer Older
  • Learn to ignore specific revisions
  • Eliot Berriot's avatar
    Eliot Berriot committed
    
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    import pytest
    
    from django.core.management import call_command
    from django.core.management.base import CommandError
    
    DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
    
    def test_management_command_requires_a_valid_library_id(factories):
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        with pytest.raises(CommandError, match=r".*Invalid library id.*"):
    
            call_command("import_files", "wrong_id", path, interactive=False)
    
    def test_in_place_import_only_from_music_dir(factories, settings):
    
        library = factories["music.Library"](actor__local=True)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        settings.MUSIC_DIRECTORY_PATH = "/nope"
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        with pytest.raises(
            CommandError, match=r".*Importing in-place only works if importing.*"
        ):
    
            call_command(
    
                "import_files", str(library.uuid), path, in_place=True, interactive=False
    
    def test_import_with_multiple_argument(factories, mocker):
    
        library = factories["music.Library"](actor__local=True)
    
        path1 = os.path.join(DATA_DIR, "dummy_file.ogg")
        path2 = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
        mocked_filter = mocker.patch(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            "funkwhale_api.music.management.commands.import_files.Command.filter_matching",
    
            return_value=({"new": [], "skipped": []}),
        )
    
        call_command("import_files", str(library.uuid), path1, path2, interactive=False)
        mocked_filter.assert_called_once_with([path1, path2], library)
    
    
    @pytest.mark.parametrize(
        "path",
        [os.path.join(DATA_DIR, "dummy_file.ogg"), os.path.join(DATA_DIR, "utf8-éà◌.ogg")],
    )
    def test_import_files_stores_proper_data(factories, mocker, now, path):
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        library = factories["music.Library"](actor__local=True)
        call_command(
            "import_files", str(library.uuid), path, async_=False, interactive=False
        )
        upload = library.uploads.last()
        assert upload.import_reference == "cli-{}".format(now.isoformat())
        assert upload.import_status == "pending"
        assert upload.source == "file://{}".format(path)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        assert upload.import_metadata == {
            "funkwhale": {
                "config": {"replace": False, "dispatch_outbox": False, "broadcast": False}
            }
        }
    
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    
    def test_import_with_outbox_flag(factories, mocker):
        library = factories["music.Library"](actor__local=True)
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        call_command(
            "import_files", str(library.uuid), path, outbox=True, interactive=False
        )
        upload = library.uploads.last()
    
        assert upload.import_metadata["funkwhale"]["config"]["dispatch_outbox"] is True
    
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    
    def test_import_with_broadcast_flag(factories, mocker):
        library = factories["music.Library"](actor__local=True)
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        call_command(
            "import_files", str(library.uuid), path, broadcast=True, interactive=False
        )
        upload = library.uploads.last()
    
        assert upload.import_metadata["funkwhale"]["config"]["broadcast"] is True
    
    
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    R En's avatar
    R En committed
    def test_import_with_replace_flag(factories, mocker):
    
        library = factories["music.Library"](actor__local=True)
    
    R En's avatar
    R En committed
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
    
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        call_command(
            "import_files", str(library.uuid), path, replace=True, interactive=False
    
        upload = library.uploads.last()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        assert upload.import_metadata["funkwhale"]["config"]["replace"] is True
    
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    def test_import_with_custom_reference(factories, mocker):
        library = factories["music.Library"](actor__local=True)
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        call_command(
            "import_files",
            str(library.uuid),
            path,
            reference="test",
            replace=True,
            interactive=False,
        )
        upload = library.uploads.last()
    
        assert upload.import_reference == "test"
    
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    def test_import_files_skip_if_path_already_imported(factories, mocker):
    
        library = factories["music.Library"](actor__local=True)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        path = os.path.join(DATA_DIR, "dummy_file.ogg")
    
        # existing one with same source
        factories["music.Upload"](
            library=library, import_status="finished", source="file://{}".format(path)
        )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            "import_files", str(library.uuid), path, async_=False, interactive=False
    
        )
        assert library.uploads.count() == 1
    
    def test_import_files_in_place(factories, mocker, settings):
        settings.MUSIC_DIRECTORY_PATH = DATA_DIR
    
        mocked_process = mocker.patch("funkwhale_api.music.tasks.process_upload")
        library = factories["music.Library"](actor__local=True)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        path = os.path.join(DATA_DIR, "utf8-éà◌.ogg")
    
        call_command(
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            "import_files",
    
            str(library.uuid),
    
            in_place=True,
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            interactive=False,
        )
    
        upload = library.uploads.last()
        assert bool(upload.audio_file) is False
        mocked_process.assert_called_once_with(upload_id=upload.pk)
    
    def test_storage_rename_utf_8_files(factories):
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        upload = factories["music.Upload"](audio_file__filename="été.ogg")
        assert upload.audio_file.name.endswith("ete.ogg")