Newer
Older
import datetime
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, Q
from django.utils import timezone
Eliot Berriot
committed
from rest_framework import exceptions
from rest_framework import permissions as rest_permissions
from rest_framework import renderers, response, viewsets
from rest_framework.decorators import action
Eliot Berriot
committed
from rest_framework.serializers import ValidationError
import funkwhale_api
from funkwhale_api.activity import record
from funkwhale_api.common import fields, preferences, utils as common_utils
from funkwhale_api.favorites.models import TrackFavorite
from funkwhale_api.moderation import filters as moderation_filters
Eliot Berriot
committed
from funkwhale_api.music import models as music_models
from funkwhale_api.music import utils
Eliot Berriot
committed
from funkwhale_api.music import views as music_views
from funkwhale_api.playlists import models as playlists_models
from funkwhale_api.tags import models as tags_models
from funkwhale_api.users import models as users_models
Eliot Berriot
committed
from . import authentication, filters, negotiation, serializers
Eliot Berriot
committed
def find_object(
queryset, model_field="pk", field="id", cast=int, filter_playable=False
):
Eliot Berriot
committed
def decorator(func):
Eliot Berriot
committed
def inner(self, request, *args, **kwargs):
data = request.GET or request.POST
try:
raw_value = data[field]
except KeyError:
return response.Response(
{
"error": {
"code": 10,
"message": "required parameter '{}' not present".format(
field
),
}
Eliot Berriot
committed
try:
value = cast(raw_value)
except (ValueError, TypeError, ValidationError):
return response.Response(
{
"error": {
"code": 0,
"message": 'For input string "{}"'.format(raw_value),
}
if filter_playable:
actor = utils.get_actor_from_request(request)
qs = qs.playable_by(actor)
Eliot Berriot
committed
try:
obj = qs.get(**{model_field: value})
except qs.model.DoesNotExist:
return response.Response(
{
"error": {
"code": 70,
"message": "{} not found".format(qs.model.__name__),
Eliot Berriot
committed
return func(self, request, *args, **kwargs)
Eliot Berriot
committed
return inner
Eliot Berriot
committed
return decorator
def get_playlist_qs(request):
qs = playlists_models.Playlist.objects.filter(
fields.privacy_level_query(request.user)
)
qs = qs.with_tracks_count().exclude(_tracks_count=0).select_related("user")
return qs.order_by("-creation_date")
Eliot Berriot
committed
class SubsonicViewSet(viewsets.GenericViewSet):
content_negotiation_class = negotiation.SubsonicContentNegociation
authentication_classes = [authentication.SubsonicAuthentication]
permission_classes = [rest_permissions.IsAuthenticated]
Eliot Berriot
committed
Eliot Berriot
committed
def dispatch(self, request, *args, **kwargs):
Eliot Berriot
committed
r = response.Response({}, status=405)
r.accepted_renderer = renderers.JSONRenderer()
Eliot Berriot
committed
r.renderer_context = {}
return r
return super().dispatch(request, *args, **kwargs)
Eliot Berriot
committed
def handle_exception(self, exc):
# subsonic API sends 200 status code with custom error
# codes in the payload
mapping = {
exceptions.AuthenticationFailed: (40, "Wrong username or password."),
exceptions.NotAuthenticated: (10, "Required parameter is missing."),
}
if exc.__class__ in mapping:
Eliot Berriot
committed
code, message = mapping[exc.__class__]
else:
return super().handle_exception(exc)
Eliot Berriot
committed
return response.Response(payload, status=200)
@action(detail=False, methods=["get", "post"], permission_classes=[])
Eliot Berriot
committed
def ping(self, request, *args, **kwargs):
Eliot Berriot
committed
return response.Response(data, status=200)
permission_classes=[],
def get_license(self, request, *args, **kwargs):
now = timezone.now()
data = {
"type": "funkwhale",
"funkwhaleVersion": funkwhale_api.__version__,
"license": {
"valid": "true",
"email": "valid@valid.license",
"licenseExpires": now + datetime.timedelta(days=365),
},
}
return response.Response(data, status=200)
@action(
detail=False,
methods=["get", "post"],
url_name="get_artists",
url_path="getArtists",
)
Eliot Berriot
committed
def get_artists(self, request, *args, **kwargs):
artists = (
music_models.Artist.objects.all()
.exclude(
moderation_filters.get_filtered_content_query(
moderation_filters.USER_FILTER_CONFIG["ARTIST"], request.user
)
)
.playable_by(utils.get_actor_from_request(request))
Eliot Berriot
committed
data = serializers.GetArtistsSerializer(artists).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@action(
detail=False,
methods=["get", "post"],
url_name="get_indexes",
url_path="getIndexes",
)
def get_indexes(self, request, *args, **kwargs):
artists = (
music_models.Artist.objects.all()
.exclude(
moderation_filters.get_filtered_content_query(
moderation_filters.USER_FILTER_CONFIG["ARTIST"], request.user
)
)
.playable_by(utils.get_actor_from_request(request))
data = serializers.GetArtistsSerializer(artists).data
return response.Response(payload, status=200)
@action(
detail=False,
methods=["get", "post"],
url_name="get_artist",
url_path="getArtist",
)
@find_object(music_models.Artist.objects.all(), filter_playable=True)
Eliot Berriot
committed
def get_artist(self, request, *args, **kwargs):
Eliot Berriot
committed
data = serializers.GetArtistSerializer(artist).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@action(
detail=False, methods=["get", "post"], url_name="get_song", url_path="getSong"
)
@find_object(music_models.Track.objects.all(), filter_playable=True)
def get_song(self, request, *args, **kwargs):
track = kwargs.pop("obj")
data = serializers.GetSongSerializer(track).data
payload = {"song": data}
return response.Response(payload, status=200)
@action(
detail=False,
methods=["get", "post"],
url_name="get_artist_info2",
url_path="getArtistInfo2",
@find_object(music_models.Artist.objects.all(), filter_playable=True)
def get_artist_info2(self, request, *args, **kwargs):
return response.Response(payload, status=200)
@action(
detail=False, methods=["get", "post"], url_name="get_album", url_path="getAlbum"
)
@find_object(
music_models.Album.objects.select_related("artist"), filter_playable=True
)
Eliot Berriot
committed
def get_album(self, request, *args, **kwargs):
Eliot Berriot
committed
data = serializers.GetAlbumSerializer(album).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@action(detail=False, methods=["get", "post"], url_name="stream", url_path="stream")
@find_object(music_models.Track.objects.all(), filter_playable=True)
Eliot Berriot
committed
def stream(self, request, *args, **kwargs):
data = request.GET or request.POST
queryset = track.uploads.select_related("track__album__artist", "track__artist")
upload = queryset.first()
if not upload:
return response.Response(status=404)
max_bitrate = data.get("maxBitRate")
try:
max_bitrate = min(max(int(max_bitrate), 0), 320) or None
except (TypeError, ValueError):
max_bitrate = None
if max_bitrate:
max_bitrate = max_bitrate * 1000
format = data.get("format") or None
if max_bitrate and not format:
# specific bitrate requested, but no format specified
# so we use a default one, cf #867. This helps with clients
# that don't send the format parameter, such as DSub.
format = settings.SUBSONIC_DEFAULT_TRANSCODING_FORMAT
elif format == "raw":
format = None
return music_views.handle_serve(
upload=upload,
user=request.user,
format=format,
max_bitrate=max_bitrate,
# Subsonic clients don't expect 302 redirection unfortunately,
# So we have to proxy media files
proxy_media=True,
@action(detail=False, methods=["get", "post"], url_name="star", url_path="star")
def star(self, request, *args, **kwargs):
TrackFavorite.add(user=request.user, track=track)
@action(detail=False, methods=["get", "post"], url_name="unstar", url_path="unstar")
def unstar(self, request, *args, **kwargs):
request.user.track_favorites.filter(track=track).delete()
@action(
detail=False,
methods=["get", "post"],
url_name="get_starred2",
url_path="getStarred2",
def get_starred2(self, request, *args, **kwargs):
favorites = request.user.track_favorites.all()
data = {"starred2": {"song": serializers.get_starred_tracks_data(favorites)}}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_random_songs",
url_path="getRandomSongs",
)
def get_random_songs(self, request, *args, **kwargs):
data = request.GET or request.POST
actor = utils.get_actor_from_request(request)
queryset = music_models.Track.objects.all().exclude(
moderation_filters.get_filtered_content_query(
moderation_filters.USER_FILTER_CONFIG["TRACK"], request.user
)
)
queryset = queryset.playable_by(actor)
try:
size = int(data["size"])
except (TypeError, KeyError, ValueError):
size = 50
queryset = (
queryset.playable_by(actor).prefetch_related("uploads").order_by("?")[:size]
)
data = {
"randomSongs": {
"song": serializers.GetSongSerializer(queryset, many=True).data
}
}
return response.Response(data)
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
@action(
detail=False,
methods=["get", "post"],
url_name="get_songs_by_genre",
url_path="getSongsByGenre",
)
def get_songs_by_genre(self, request, *args, **kwargs):
data = request.GET or request.POST
actor = utils.get_actor_from_request(request)
queryset = music_models.Track.objects.all().exclude(
moderation_filters.get_filtered_content_query(
moderation_filters.USER_FILTER_CONFIG["TRACK"], request.user
)
)
queryset = queryset.playable_by(actor)
try:
size = int(
data["count"]
) # yep. Some endpoints have size, other have count…
except (TypeError, KeyError, ValueError):
size = 50
genre = data.get("genre")
queryset = (
queryset.playable_by(actor)
.filter(
Q(tagged_items__tag__name=genre)
| Q(artist__tagged_items__tag__name=genre)
| Q(album__artist__tagged_items__tag__name=genre)
| Q(album__tagged_items__tag__name=genre)
)
.prefetch_related("uploads")
.distinct()
.order_by("-creation_date")[:size]
)
data = {
"songsByGenre": {
"song": serializers.GetSongSerializer(queryset, many=True).data
}
}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_starred",
url_path="getStarred",
)
def get_starred(self, request, *args, **kwargs):
favorites = request.user.track_favorites.all()
data = {"starred": {"song": serializers.get_starred_tracks_data(favorites)}}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_album_list2",
url_path="getAlbumList2",
def get_album_list2(self, request, *args, **kwargs):
queryset = (
music_models.Album.objects.exclude(
moderation_filters.get_filtered_content_query(
moderation_filters.USER_FILTER_CONFIG["ALBUM"], request.user
)
)
.with_tracks_count()
.order_by("artist__name")
data = request.GET or request.POST
filterset = filters.AlbumList2FilterSet(data, queryset=queryset)
queryset = filterset.qs
actor = utils.get_actor_from_request(request)
queryset = queryset.playable_by(actor)
type = data.get("type", "alphabeticalByArtist")
if type == "alphabeticalByArtist":
queryset = queryset.order_by("artist__name")
elif type == "random":
queryset = queryset.order_by("?")
elif type == "alphabeticalByName" or not type:
queryset = queryset.order_by("artist__title")
elif type == "recent" or not type:
queryset = queryset.exclude(release_date__in=["", None]).order_by(
"-release_date"
)
elif type == "newest" or not type:
queryset = queryset.order_by("-creation_date")
elif type == "byGenre" and data.get("genre"):
genre = data.get("genre")
queryset = queryset.filter(
Q(tagged_items__tag__name=genre)
| Q(artist__tagged_items__tag__name=genre)
)
except (TypeError, KeyError, ValueError):
offset = 0
try:
except (TypeError, KeyError, ValueError):
size = 50
size = min(size, 500)
queryset = queryset[offset : offset + size]
data = {"albumList2": {"album": serializers.get_album_list2_data(queryset)}}
return response.Response(data)
@action(
detail=False, methods=["get", "post"], url_name="search3", url_path="search3"
)
def search3(self, request, *args, **kwargs):
data = request.GET or request.POST
actor = utils.get_actor_from_request(request)
conf = [
{
"subsonic": "artist",
"search_fields": ["name"],
"queryset": (
music_models.Artist.objects.with_albums_count().values(
"id", "_albums_count", "name"
)
"serializer": lambda qs: [serializers.get_artist_data(a) for a in qs],
"subsonic": "album",
"search_fields": ["title"],
"queryset": (
music_models.Album.objects.with_tracks_count().select_related(
"artist"
)
"subsonic": "song",
"search_fields": ["title"],
"queryset": (
music_models.Track.objects.prefetch_related(
"uploads"
).select_related("album__artist")
for c in conf:
offsetKey = "{}Offset".format(c["subsonic"])
countKey = "{}Count".format(c["subsonic"])
try:
offset = int(data[offsetKey])
except (TypeError, KeyError, ValueError):
offset = 0
try:
size = int(data[countKey])
except (TypeError, KeyError, ValueError):
size = 20
size = min(size, 100)
queryset = c["queryset"].filter(
utils.get_query(query, c["search_fields"])
queryset = queryset.playable_by(actor)
queryset = common_utils.order_for_search(queryset, c["search_fields"][0])
queryset = queryset[offset : offset + size]
payload["searchResult3"][c["subsonic"]] = c["serializer"](queryset)
return response.Response(payload)
@action(
detail=False,
methods=["get", "post"],
url_name="get_playlists",
url_path="getPlaylists",
def get_playlists(self, request, *args, **kwargs):
qs = get_playlist_qs(request)
"playlists": {"playlist": [serializers.get_playlist_data(p) for p in qs]}
}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_playlist",
url_path="getPlaylist",
@find_object(lambda request: get_playlist_qs(request))
def get_playlist(self, request, *args, **kwargs):
playlist = kwargs.pop("obj")
data = {"playlist": serializers.get_playlist_detail_data(playlist)}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="update_playlist",
url_path="updatePlaylist",
)
@find_object(lambda request: request.user.playlists.all(), field="playlistId")
def update_playlist(self, request, *args, **kwargs):
data = request.GET or request.POST
if new_name:
playlist.name = new_name
playlist.save(update_fields=["name", "modification_date"])
plt = playlist.playlist_tracks.get(index=to_remove)
except (TypeError, ValueError, KeyError):
pass
except playlists_models.PlaylistTrack.DoesNotExist:
pass
else:
plt.delete(update_indexes=True)
ids = []
try:
ids.append(int(i))
except (TypeError, ValueError):
pass
if ids:
tracks = music_models.Track.objects.filter(pk__in=ids)
by_id = {t.id: t for t in tracks}
sorted_tracks = []
for i in ids:
try:
sorted_tracks.append(by_id[i])
except KeyError:
pass
if sorted_tracks:
playlist.insert_many(sorted_tracks)
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="delete_playlist",
url_path="deletePlaylist",
)
@find_object(lambda request: request.user.playlists.all())
def delete_playlist(self, request, *args, **kwargs):
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="create_playlist",
url_path="createPlaylist",
def create_playlist(self, request, *args, **kwargs):
data = request.GET or request.POST
return response.Response(
{
"error": {
"code": 10,
"message": "Playlist ID or name must be specified.",
}
ids = []
try:
ids.append(int(i))
except (TypeError, ValueError):
pass
if ids:
tracks = music_models.Track.objects.filter(pk__in=ids)
by_id = {t.id: t for t in tracks}
sorted_tracks = []
for i in ids:
try:
sorted_tracks.append(by_id[i])
except KeyError:
pass
if sorted_tracks:
playlist.insert_many(sorted_tracks)
playlist = request.user.playlists.with_tracks_count().get(pk=playlist.pk)
data = {"playlist": serializers.get_playlist_detail_data(playlist)}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_avatar",
url_path="getAvatar",
)
@find_object(
queryset=users_models.User.objects.exclude(avatar=None).exclude(avatar=""),
model_field="username__iexact",
field="username",
cast=str,
)
def get_avatar(self, request, *args, **kwargs):
user = kwargs.pop("obj")
mapping = {"nginx": "X-Accel-Redirect", "apache2": "X-Sendfile"}
path = music_views.get_file_path(user.avatar)
file_header = mapping[settings.REVERSE_PROXY_TYPE]
# let the proxy set the content-type
r = response.Response({}, content_type="")
r[file_header] = path
return r
@action(
detail=False, methods=["get", "post"], url_name="get_user", url_path="getUser"
)
@find_object(
queryset=lambda request: users_models.User.objects.filter(pk=request.user.pk),
model_field="username__iexact",
field="username",
cast=str,
)
def get_user(self, request, *args, **kwargs):
data = {"user": serializers.get_user_detail_data(request.user)}
return response.Response(data)
methods=["get", "post"],
url_name="get_music_folders",
url_path="getMusicFolders",
)
def get_music_folders(self, request, *args, **kwargs):
data = {"musicFolders": {"musicFolder": [{"id": 1, "name": "Music"}]}}
return response.Response(data)
@action(
detail=False,
methods=["get", "post"],
url_name="get_cover_art",
url_path="getCoverArt",
def get_cover_art(self, request, *args, **kwargs):
data = request.GET or request.POST
if not id:
return response.Response(
{"error": {"code": 10, "message": "cover art ID must be specified."}}
)
try:
album_id = int(id.replace("al-", ""))
album = (
music_models.Album.objects.exclude(cover__isnull=True)
.exclude(cover="")
.get(pk=album_id)
)
except (TypeError, ValueError, music_models.Album.DoesNotExist):
return response.Response(
{"error": {"code": 70, "message": "cover art not found."}}
)
cover = album.cover
else:
return response.Response(
{"error": {"code": 70, "message": "cover art not found."}}
)
mapping = {"nginx": "X-Accel-Redirect", "apache2": "X-Sendfile"}
path = music_views.get_file_path(cover)
file_header = mapping[settings.REVERSE_PROXY_TYPE]
# let the proxy set the content-type
r[file_header] = path
@action(
detail=False, methods=["get", "post"], url_name="scrobble", url_path="scrobble"
)
def scrobble(self, request, *args, **kwargs):
data = request.GET or request.POST
serializer = serializers.ScrobbleSerializer(
if not serializer.is_valid():
return response.Response(
{"error": {"code": 0, "message": "Invalid payload"}}
)
if serializer.validated_data["submission"]:
listening = serializer.save()
record.send(listening)
return response.Response({})
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
@action(
detail=False,
methods=["get", "post"],
url_name="get_genres",
url_path="getGenres",
)
def get_genres(self, request, *args, **kwargs):
album_ct = ContentType.objects.get_for_model(music_models.Album)
track_ct = ContentType.objects.get_for_model(music_models.Track)
queryset = (
tags_models.Tag.objects.annotate(
_albums_count=Count(
"tagged_items", filter=Q(tagged_items__content_type=album_ct)
),
_tracks_count=Count(
"tagged_items", filter=Q(tagged_items__content_type=track_ct)
),
)
.exclude(_tracks_count=0, _albums_count=0)
.order_by("name")
)
data = {
"genres": {"genre": [serializers.get_genre_data(tag) for tag in queryset]}
}
return response.Response(data)