Newer
Older
import datetime
from django.conf import settings
from django.utils import timezone
Eliot Berriot
committed
from rest_framework import exceptions
from rest_framework import permissions as rest_permissions
Eliot Berriot
committed
from rest_framework import renderers
Eliot Berriot
committed
from rest_framework import response
from rest_framework import viewsets
from rest_framework.decorators import list_route
from rest_framework.serializers import ValidationError
from funkwhale_api.activity import record
Eliot Berriot
committed
from funkwhale_api.common import preferences
from funkwhale_api.favorites.models import TrackFavorite
Eliot Berriot
committed
from funkwhale_api.music import models as music_models
from funkwhale_api.music import utils
Eliot Berriot
committed
from funkwhale_api.music import views as music_views
from funkwhale_api.playlists import models as playlists_models
Eliot Berriot
committed
from . import authentication
from . import filters
Eliot Berriot
committed
from . import negotiation
from . import serializers
def find_object(queryset, model_field="pk", field="id", cast=int):
Eliot Berriot
committed
def decorator(func):
def inner(self, request, *args, **kwargs):
data = request.GET or request.POST
try:
raw_value = data[field]
except KeyError:
return response.Response(
{
"error": {
"code": 10,
"message": "required parameter '{}' not present".format(
field
),
}
Eliot Berriot
committed
try:
value = cast(raw_value)
except (TypeError, ValidationError):
return response.Response(
{
"error": {
"code": 0,
"message": 'For input string "{}"'.format(raw_value),
}
Eliot Berriot
committed
try:
obj = qs.get(**{model_field: value})
except qs.model.DoesNotExist:
return response.Response(
{
"error": {
"code": 70,
"message": "{} not found".format(
qs.model.__class__.__name__
),
}
Eliot Berriot
committed
return func(self, request, *args, **kwargs)
Eliot Berriot
committed
return inner
Eliot Berriot
committed
return decorator
class SubsonicViewSet(viewsets.GenericViewSet):
content_negotiation_class = negotiation.SubsonicContentNegociation
authentication_classes = [authentication.SubsonicAuthentication]
permissions_classes = [rest_permissions.IsAuthenticated]
Eliot Berriot
committed
def dispatch(self, request, *args, **kwargs):
Eliot Berriot
committed
r = response.Response({}, status=405)
r.accepted_renderer = renderers.JSONRenderer()
Eliot Berriot
committed
r.renderer_context = {}
return r
return super().dispatch(request, *args, **kwargs)
Eliot Berriot
committed
def handle_exception(self, exc):
# subsonic API sends 200 status code with custom error
# codes in the payload
mapping = {exceptions.AuthenticationFailed: (40, "Wrong username or password.")}
payload = {"status": "failed"}
if exc.__class__ in mapping:
Eliot Berriot
committed
code, message = mapping[exc.__class__]
else:
return super().handle_exception(exc)
Eliot Berriot
committed
return response.Response(payload, status=200)
@list_route(methods=["get", "post"], permission_classes=[])
Eliot Berriot
committed
def ping(self, request, *args, **kwargs):
Eliot Berriot
committed
return response.Response(data, status=200)
@list_route(
permissions_classes=[],
def get_license(self, request, *args, **kwargs):
now = timezone.now()
data = {
"status": "ok",
"version": "1.16.0",
"license": {
"valid": "true",
"email": "valid@valid.license",
"licenseExpires": now + datetime.timedelta(days=365),
},
}
return response.Response(data, status=200)
@list_route(methods=["get", "post"], url_name="get_artists", url_path="getArtists")
Eliot Berriot
committed
def get_artists(self, request, *args, **kwargs):
artists = music_models.Artist.objects.all()
data = serializers.GetArtistsSerializer(artists).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@list_route(methods=["get", "post"], url_name="get_indexes", url_path="getIndexes")
def get_indexes(self, request, *args, **kwargs):
artists = music_models.Artist.objects.all()
data = serializers.GetArtistsSerializer(artists).data
return response.Response(payload, status=200)
@list_route(methods=["get", "post"], url_name="get_artist", url_path="getArtist")
Eliot Berriot
committed
@find_object(music_models.Artist.objects.all())
def get_artist(self, request, *args, **kwargs):
Eliot Berriot
committed
data = serializers.GetArtistSerializer(artist).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@list_route(
methods=["get", "post"], url_name="get_artist_info2", url_path="getArtistInfo2"
)
@find_object(music_models.Artist.objects.all())
def get_artist_info2(self, request, *args, **kwargs):
return response.Response(payload, status=200)
@list_route(methods=["get", "post"], url_name="get_album", url_path="getAlbum")
@find_object(music_models.Album.objects.select_related("artist"))
Eliot Berriot
committed
def get_album(self, request, *args, **kwargs):
Eliot Berriot
committed
data = serializers.GetAlbumSerializer(album).data
Eliot Berriot
committed
return response.Response(payload, status=200)
@list_route(methods=["get", "post"], url_name="stream", url_path="stream")
@find_object(music_models.Track.objects.all())
Eliot Berriot
committed
def stream(self, request, *args, **kwargs):
Eliot Berriot
committed
queryset = track.files.select_related(
"library_track", "track__album__artist", "track__artist"
Eliot Berriot
committed
)
track_file = queryset.first()
if not track_file:
return response.Response(status=404)
Eliot Berriot
committed
return music_views.handle_serve(track_file)
@list_route(methods=["get", "post"], url_name="star", url_path="star")
@find_object(music_models.Track.objects.all())
def star(self, request, *args, **kwargs):
TrackFavorite.add(user=request.user, track=track)
@list_route(methods=["get", "post"], url_name="unstar", url_path="unstar")
@find_object(music_models.Track.objects.all())
def unstar(self, request, *args, **kwargs):
request.user.track_favorites.filter(track=track).delete()
@list_route(
methods=["get", "post"], url_name="get_starred2", url_path="getStarred2"
)
def get_starred2(self, request, *args, **kwargs):
favorites = request.user.track_favorites.all()
data = {"starred2": {"song": serializers.get_starred_tracks_data(favorites)}}
return response.Response(data)
@list_route(methods=["get", "post"], url_name="get_starred", url_path="getStarred")
def get_starred(self, request, *args, **kwargs):
favorites = request.user.track_favorites.all()
data = {"starred": {"song": serializers.get_starred_tracks_data(favorites)}}
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="get_album_list2", url_path="getAlbumList2"
)
def get_album_list2(self, request, *args, **kwargs):
queryset = music_models.Album.objects.with_tracks_count()
data = request.GET or request.POST
filterset = filters.AlbumList2FilterSet(data, queryset=queryset)
queryset = filterset.qs
try:
except (TypeError, KeyError, ValueError):
offset = 0
try:
except (TypeError, KeyError, ValueError):
size = 50
size = min(size, 500)
queryset = queryset[offset:size]
data = {"albumList2": {"album": serializers.get_album_list2_data(queryset)}}
return response.Response(data)
@list_route(methods=["get", "post"], url_name="search3", url_path="search3")
def search3(self, request, *args, **kwargs):
data = request.GET or request.POST
conf = [
{
"subsonic": "artist",
"search_fields": ["name"],
"queryset": (
music_models.Artist.objects.with_albums_count().values(
"id", "_albums_count", "name"
)
"serializer": lambda qs: [serializers.get_artist_data(a) for a in qs],
"subsonic": "album",
"search_fields": ["title"],
"queryset": (
music_models.Album.objects.with_tracks_count().select_related(
"artist"
)
"subsonic": "song",
"search_fields": ["title"],
"queryset": (
music_models.Track.objects.prefetch_related("files").select_related(
"album__artist"
)
for c in conf:
offsetKey = "{}Offset".format(c["subsonic"])
countKey = "{}Count".format(c["subsonic"])
try:
offset = int(data[offsetKey])
except (TypeError, KeyError, ValueError):
offset = 0
try:
size = int(data[countKey])
except (TypeError, KeyError, ValueError):
size = 20
size = min(size, 100)
queryset = c["queryset"].filter(
utils.get_query(query, c["search_fields"])
)
queryset = queryset[offset:size]
payload["searchResult3"][c["subsonic"]] = c["serializer"](queryset)
return response.Response(payload)
methods=["get", "post"], url_name="get_playlists", url_path="getPlaylists"
)
def get_playlists(self, request, *args, **kwargs):
playlists = request.user.playlists.with_tracks_count().select_related("user")
"playlists": {
"playlist": [serializers.get_playlist_data(p) for p in playlists]
}
}
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="get_playlist", url_path="getPlaylist"
)
@find_object(playlists_models.Playlist.objects.with_tracks_count())
def get_playlist(self, request, *args, **kwargs):
playlist = kwargs.pop("obj")
data = {"playlist": serializers.get_playlist_detail_data(playlist)}
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="update_playlist", url_path="updatePlaylist"
)
@find_object(lambda request: request.user.playlists.all(), field="playlistId")
def update_playlist(self, request, *args, **kwargs):
data = request.GET or request.POST
if new_name:
playlist.name = new_name
playlist.save(update_fields=["name", "modification_date"])
plt = playlist.playlist_tracks.get(index=to_remove)
except (TypeError, ValueError, KeyError):
pass
except playlists_models.PlaylistTrack.DoesNotExist:
pass
else:
plt.delete(update_indexes=True)
ids = []
try:
ids.append(int(i))
except (TypeError, ValueError):
pass
if ids:
tracks = music_models.Track.objects.filter(pk__in=ids)
by_id = {t.id: t for t in tracks}
sorted_tracks = []
for i in ids:
try:
sorted_tracks.append(by_id[i])
except KeyError:
pass
if sorted_tracks:
playlist.insert_many(sorted_tracks)
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="delete_playlist", url_path="deletePlaylist"
)
@find_object(lambda request: request.user.playlists.all())
def delete_playlist(self, request, *args, **kwargs):
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="create_playlist", url_path="createPlaylist"
)
def create_playlist(self, request, *args, **kwargs):
data = request.GET or request.POST
return response.Response(
{
"error": {
"code": 10,
"message": "Playlist ID or name must be specified.",
}
ids = []
try:
ids.append(int(i))
except (TypeError, ValueError):
pass
if ids:
tracks = music_models.Track.objects.filter(pk__in=ids)
by_id = {t.id: t for t in tracks}
sorted_tracks = []
for i in ids:
try:
sorted_tracks.append(by_id[i])
except KeyError:
pass
if sorted_tracks:
playlist.insert_many(sorted_tracks)
playlist = request.user.playlists.with_tracks_count().get(pk=playlist.pk)
data = {"playlist": serializers.get_playlist_detail_data(playlist)}
return response.Response(data)
methods=["get", "post"],
url_name="get_music_folders",
url_path="getMusicFolders",
)
def get_music_folders(self, request, *args, **kwargs):
data = {"musicFolders": {"musicFolder": [{"id": 1, "name": "Music"}]}}
return response.Response(data)
@list_route(
methods=["get", "post"], url_name="get_cover_art", url_path="getCoverArt"
)
def get_cover_art(self, request, *args, **kwargs):
data = request.GET or request.POST
if not id:
return response.Response(
{"error": {"code": 10, "message": "cover art ID must be specified."}}
)
try:
album_id = int(id.replace("al-", ""))
album = (
music_models.Album.objects.exclude(cover__isnull=True)
.exclude(cover="")
.get(pk=album_id)
)
except (TypeError, ValueError, music_models.Album.DoesNotExist):
return response.Response(
{"error": {"code": 70, "message": "cover art not found."}}
)
cover = album.cover
else:
return response.Response(
{"error": {"code": 70, "message": "cover art not found."}}
)
mapping = {"nginx": "X-Accel-Redirect", "apache2": "X-Sendfile"}
path = music_views.get_file_path(cover)
file_header = mapping[settings.REVERSE_PROXY_TYPE]
# let the proxy set the content-type
r[file_header] = path
@list_route(methods=["get", "post"], url_name="scrobble", url_path="scrobble")
def scrobble(self, request, *args, **kwargs):
data = request.GET or request.POST
serializer = serializers.ScrobbleSerializer(
if not serializer.is_valid():
return response.Response(
{"error": {"code": 0, "message": "Invalid payload"}}
)
if serializer.validated_data["submission"]:
l = serializer.save()
record.send(l)
return response.Response({})