Newer
Older
import pytest
from django.urls import reverse
from funkwhale_api.music import models
from funkwhale_api.musicbrainz import api
from funkwhale_api.music import serializers
from funkwhale_api.music import tasks
from . import data as api_data
DATA_DIR = os.path.dirname(os.path.abspath(__file__))
def test_can_submit_youtube_url_for_track_import(mocker, superuser_client):
mocker.patch(
'funkwhale_api.musicbrainz.api.artists.get',
return_value=api_data.artists['get']['adhesive_wombat'])
mocker.patch(
'funkwhale_api.musicbrainz.api.releases.get',
return_value=api_data.albums['get']['marsupial'])
mocker.patch(
'funkwhale_api.musicbrainz.api.recordings.get',
return_value=api_data.tracks['get']['8bitadventures'])
mocker.patch(
'funkwhale_api.music.models.TrackFile.download_file',
return_value=None)
mbid = '9968a9d6-8d92-4051-8f76-674e157b6eed'
video_id = 'tPEE9ZwTmy0'
url = reverse('api:v1:submit-single')
response = superuser_client.post(
url,
{'import_url': 'https://www.youtube.com/watch?v={0}'.format(video_id),
'mbid': mbid})
track = models.Track.objects.get(mbid=mbid)
assert track.artist.name == 'Adhesive Wombat'
assert track.album.title == 'Marsupial Madness'
def test_import_creates_an_import_with_correct_data(mocker, superuser_client):
mocker.patch('funkwhale_api.music.tasks.import_job_run')
mbid = '9968a9d6-8d92-4051-8f76-674e157b6eed'
video_id = 'tPEE9ZwTmy0'
url = reverse('api:v1:submit-single')
response = superuser_client.post(
url,
{'import_url': 'https://www.youtube.com/watch?v={0}'.format(video_id),
'mbid': mbid})
batch = models.ImportBatch.objects.latest('id')
assert batch.jobs.count() == 1
assert batch.submitted_by == superuser_client.user
assert batch.status == 'pending'
job = batch.jobs.first()
assert str(job.mbid) == mbid
assert job.status == 'pending'
assert job.source == 'https://www.youtube.com/watch?v={0}'.format(video_id)
def test_can_import_whole_album(mocker, superuser_client):
mocker.patch('funkwhale_api.music.tasks.import_job_run')
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
mocker.patch(
'funkwhale_api.musicbrainz.api.artists.get',
return_value=api_data.artists['get']['soad'])
mocker.patch(
'funkwhale_api.musicbrainz.api.images.get_front',
return_value=b'')
mocker.patch(
'funkwhale_api.musicbrainz.api.releases.get',
return_value=api_data.albums['get_with_includes']['hypnotize'])
payload = {
'releaseId': '47ae093f-1607-49a3-be11-a15d335ccc94',
'tracks': [
{
'mbid': '1968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=1111111111',
},
{
'mbid': '2968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=2222222222',
},
{
'mbid': '3968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=3333333333',
},
]
}
url = reverse('api:v1:submit-album')
response = superuser_client.post(
url, json.dumps(payload), content_type="application/json")
batch = models.ImportBatch.objects.latest('id')
assert batch.jobs.count() == 3
assert batch.submitted_by == superuser_client.user
assert batch.status == 'pending'
album = models.Album.objects.latest('id')
assert str(album.mbid) == '47ae093f-1607-49a3-be11-a15d335ccc94'
medium_data = api_data.albums['get_with_includes']['hypnotize']['release']['medium-list'][0]
assert int(medium_data['track-count']) == album.tracks.all().count()
for track in medium_data['track-list']:
instance = models.Track.objects.get(mbid=track['recording']['id'])
assert instance.title == track['recording']['title']
assert instance.position == int(track['position'])
assert instance.title == track['recording']['title']
for row in payload['tracks']:
job = models.ImportJob.objects.get(mbid=row['mbid'])
assert str(job.mbid) == row['mbid']
assert job.status == 'pending'
assert job.source == row['source']
def test_can_import_whole_artist(mocker, superuser_client):
mocker.patch('funkwhale_api.music.tasks.import_job_run')
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
mocker.patch(
'funkwhale_api.musicbrainz.api.artists.get',
return_value=api_data.artists['get']['soad'])
mocker.patch(
'funkwhale_api.musicbrainz.api.images.get_front',
return_value=b'')
mocker.patch(
'funkwhale_api.musicbrainz.api.releases.get',
return_value=api_data.albums['get_with_includes']['hypnotize'])
payload = {
'artistId': 'mbid',
'albums': [
{
'releaseId': '47ae093f-1607-49a3-be11-a15d335ccc94',
'tracks': [
{
'mbid': '1968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=1111111111',
},
{
'mbid': '2968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=2222222222',
},
{
'mbid': '3968a9d6-8d92-4051-8f76-674e157b6eed',
'source': 'https://www.youtube.com/watch?v=3333333333',
},
]
}
]
}
url = reverse('api:v1:submit-artist')
response = superuser_client.post(
url, json.dumps(payload), content_type="application/json")
batch = models.ImportBatch.objects.latest('id')
assert batch.jobs.count() == 3
assert batch.submitted_by == superuser_client.user
assert batch.status == 'pending'
album = models.Album.objects.latest('id')
assert str(album.mbid) == '47ae093f-1607-49a3-be11-a15d335ccc94'
medium_data = api_data.albums['get_with_includes']['hypnotize']['release']['medium-list'][0]
assert int(medium_data['track-count']) == album.tracks.all().count()
for track in medium_data['track-list']:
instance = models.Track.objects.get(mbid=track['recording']['id'])
assert instance.title == track['recording']['title']
assert instance.position == int(track['position'])
assert instance.title == track['recording']['title']
for row in payload['albums'][0]['tracks']:
job = models.ImportJob.objects.get(mbid=row['mbid'])
assert str(job.mbid) == row['mbid']
assert job.status == 'pending'
assert job.source == row['source']
def test_user_can_query_api_for_his_own_batches(client, factories):
user1 = factories['users.SuperUser']()
user2 = factories['users.SuperUser']()
job = factories['music.ImportJob'](batch__submitted_by=user1)
url = reverse('api:v1:import-batches-list')
client.login(username=user2.username, password='test')
response2 = client.get(url)
results = json.loads(response2.content.decode('utf-8'))
assert results['count'] == 0
client.logout()
client.login(username=user1.username, password='test')
response1 = client.get(url)
results = json.loads(response1.content.decode('utf-8'))
assert results['count'] == 1
assert results['results'][0]['jobs'][0]['mbid'] == job.mbid
def test_user_can_create_an_empty_batch(client, factories):
user = factories['users.SuperUser']()
url = reverse('api:v1:import-batches-list')
client.login(username=user.username, password='test')
response = client.post(url)
assert response.status_code == 201
batch = user.imports.latest('id')
assert batch.submitted_by == user
assert batch.source == 'api'
def test_user_can_create_import_job_with_file(client, factories, mocker):
path = os.path.join(DATA_DIR, 'test.ogg')
m = mocker.patch('funkwhale_api.common.utils.on_commit')
user = factories['users.SuperUser']()
batch = factories['music.ImportBatch'](submitted_by=user)
url = reverse('api:v1:import-jobs-list')
client.login(username=user.username, password='test')
with open(path, 'rb') as f:
content = f.read()
f.seek(0)
response = client.post(url, {
'batch': batch.pk,
'audio_file': f,
'source': 'file://'
}, format='multipart')
assert response.status_code == 201
job = batch.jobs.latest('id')
assert job.status == 'pending'
assert job.source.startswith('file://')
assert 'test.ogg' in job.source
assert job.audio_file.read() == content
m.assert_called_once_with(
tasks.import_job_run.delay,
import_job_id=job.pk)
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def test_can_search_artist(factories, client):
artist1 = factories['music.Artist']()
artist2 = factories['music.Artist']()
expected = [serializers.ArtistSerializerNested(artist1).data]
url = reverse('api:v1:artists-search')
response = client.get(url, {'query': artist1.name})
assert json.loads(response.content.decode('utf-8')) == expected
def test_can_search_artist_by_name_start(factories, client):
artist1 = factories['music.Artist'](name='alpha')
artist2 = factories['music.Artist'](name='beta')
expected = {
'next': None,
'previous': None,
'count': 1,
'results': [serializers.ArtistSerializerNested(artist1).data]
}
url = reverse('api:v1:artists-list')
response = client.get(url, {'name__startswith': 'a'})
assert expected == json.loads(response.content.decode('utf-8'))
def test_can_search_tracks(factories, client):
track1 = factories['music.Track'](title="test track 1")
track2 = factories['music.Track']()
query = 'test track 1'
expected = [serializers.TrackSerializerNested(track1).data]
url = reverse('api:v1:tracks-search')
response = client.get(url, {'query': query})
assert expected == json.loads(response.content.decode('utf-8'))
@pytest.mark.parametrize('route,method', [
('api:v1:tags-list', 'get'),
('api:v1:tracks-list', 'get'),
('api:v1:artists-list', 'get'),
('api:v1:albums-list', 'get'),
])
def test_can_restrict_api_views_to_authenticated_users(db, route, method, settings, client):
url = reverse(route)
settings.API_AUTHENTICATION_REQUIRED = True
response = getattr(client, method)(url)
assert response.status_code == 401
def test_track_file_url_is_restricted_to_authenticated_users(client, factories, settings):
settings.API_AUTHENTICATION_REQUIRED = True
f = factories['music.TrackFile']()
assert f.audio_file is not None
url = f.path
response = client.get(url)
assert response.status_code == 401
user = factories['users.SuperUser']()
client.login(username=user.username, password='test')
response = client.get(url)
assert response.status_code == 200
assert response['X-Accel-Redirect'] == '/_protected{}'.format(f.audio_file.url)