Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 1.4.1-upgrade-release
  • 1121-download
  • 1218-smartplaylist_backend
  • 1373-login-form-move-reset-your-password-link
  • 1381-progress-bars
  • 1481
  • 1518-update-django-allauth
  • 1645
  • 1675-widget-improperly-configured-missing-resource-id
  • 1675-widget-improperly-configured-missing-resource-id-2
  • 1704-required-props-are-not-always-passed
  • 1716-add-frontend-tests-again
  • 1749-smtp-uri-configuration
  • 1930-first-upload-in-a-batch-always-fails
  • 1976-update-documentation-links-in-readme-files
  • 2054-player-layout
  • 2063-funkwhale-connection-interrupted-every-so-often-requires-network-reset-page-refresh
  • 2091-iii-6-improve-visuals-layout
  • 2151-refused-to-load-spa-manifest-json-2
  • 2154-add-to-playlist-pop-up-hidden-by-now-playing-screen
  • 2155-can-t-see-the-episode-list-of-a-podcast-as-an-anonymous-user-with-anonymous-access-enabled
  • 2156-add-management-command-to-change-file-ref-for-in-place-imported-files-to-s3
  • 2192-clear-queue-bug-when-random-shuffle-is-enabled
  • 2205-channel-page-pagination-link-dont-working
  • 2215-custom-logger-does-not-work-at-all-with-webkit-and-blink-based-browsers
  • 2228-troi-real-world-review
  • 2274-implement-new-upload-api
  • 2303-allow-users-to-own-tagged-items
  • 2395-far-right-filter
  • 2405-front-buttont-trigger-third-party-hook
  • 2408-troi-create-missing-tracks
  • 2416-revert-library-drop
  • 2429-fix-popover-auto-close
  • 2448-complete-tags
  • 2452-fetch-third-party-metadata
  • 2467-fix-radio-builder
  • 2469-Fix-search-bar-in-ManageUploads
  • 2476-deep-upload-links
  • 2480-add-notification-number-badges
  • 2482-upgrade-about-page-to-use-new-ui
  • 2487-fix-accessibility-according-to-WCAG
  • 2490-experiment-use-rstore
  • 2490-experimental-use-simple-data-store
  • 2490-fix-search-modal
  • 2490-search-modal
  • 2501-fix-compatibility-with-older-browsers
  • 2502-drop-uno-and-jquery
  • 2533-allow-followers-in-user-activiy-privacy-level
  • 2539-drop-ansible-installation-method-in-favor-of-docker
  • 2550-22-user-interfaces-for-federation
  • 2560-default-modal-width
  • 623-test
  • 653-enable-starting-embedded-player-at-a-specific-position-in-track
  • activitypub-overview
  • album-sliders
  • arne/2091-improve-visuals
  • back-option-for-edits
  • chore/2406-compose-modularity-scope
  • develop
  • develop-password-reset
  • env-file-cleanup
  • feat/2091-improve-visuals
  • feature/2481-vui-translations
  • fix-amd64-docker-build-gfortran
  • fix-channel-creation
  • fix-front-node-version
  • fix-gitpod
  • fix-plugins-dev-setup
  • fix-rate-limit-serializer
  • fix-schema-channel-metadata-choices
  • flupsi/2803-improve-visuals
  • flupsi/2804-new-upload-process
  • funkwhale-fix_pwa_manifest
  • funkwhale-petitminion-2136-bug-fix-prune-skipped-upload
  • funkwhale-ui-buttons
  • georg/add-typescript
  • gitpod/test-1866
  • global-button-experiment
  • global-buttons
  • juniorjpdj/pkg-repo
  • manage-py-reference
  • merge-review
  • minimal-python-version
  • petitminion-develop-patch-84496
  • pin-mutagen-to-1.46
  • pipenv
  • plugins
  • plugins-v2
  • plugins-v3
  • pre-release/1.3.0
  • prune_skipped_uploads_docs
  • refactor/homepage
  • renovate/front-all-dependencies
  • renovate/front-major-all-dependencies
  • schema-updates
  • small-gitpod-improvements
  • spectacular_schema
  • stable
  • tempArne
  • ui-buttons
  • 0.1
  • 0.10
  • 0.11
  • 0.12
  • 0.13
  • 0.14
  • 0.14.1
  • 0.14.2
  • 0.15
  • 0.16
  • 0.16.1
  • 0.16.2
  • 0.16.3
  • 0.17
  • 0.18
  • 0.18.1
  • 0.18.2
  • 0.18.3
  • 0.19.0
  • 0.19.0-rc1
  • 0.19.0-rc2
  • 0.19.1
  • 0.2
  • 0.2.1
  • 0.2.2
  • 0.2.3
  • 0.2.4
  • 0.2.5
  • 0.2.6
  • 0.20.0
  • 0.20.0-rc1
  • 0.20.1
  • 0.21
  • 0.21-rc1
  • 0.21-rc2
  • 0.21.1
  • 0.21.2
  • 0.3
  • 0.3.1
  • 0.3.2
  • 0.3.3
  • 0.3.4
  • 0.3.5
  • 0.4
  • 0.5
  • 0.5.1
  • 0.5.2
  • 0.5.3
  • 0.5.4
  • 0.6
  • 0.6.1
  • 0.7
  • 0.8
  • 0.9
  • 0.9.1
  • 1.0
  • 1.0-rc1
  • 1.0.1
  • 1.1
  • 1.1-rc1
  • 1.1-rc2
  • 1.1.1
  • 1.1.2
  • 1.1.3
  • 1.1.4
  • 1.2.0
  • 1.2.0-rc1
  • 1.2.0-rc2
  • 1.2.0-testing
  • 1.2.0-testing2
  • 1.2.0-testing3
  • 1.2.0-testing4
  • 1.2.1
  • 1.2.10
  • 1.2.2
  • 1.2.3
  • 1.2.4
  • 1.2.5
  • 1.2.6
  • 1.2.6-1
  • 1.2.7
  • 1.2.8
  • 1.2.9
  • 1.3.0
  • 1.3.0-rc1
  • 1.3.0-rc2
  • 1.3.0-rc3
  • 1.3.0-rc4
  • 1.3.0-rc5
  • 1.3.0-rc6
  • 1.3.1
  • 1.3.2
  • 1.3.3
  • 1.3.4
  • 1.4.0
  • 1.4.0-rc1
  • 1.4.0-rc2
  • 1.4.1
  • 2.0.0-alpha.1
  • 2.0.0-alpha.2
200 results

Target

Select target project
  • funkwhale/funkwhale
  • Luclu7/funkwhale
  • mbothorel/funkwhale
  • EorlBruder/funkwhale
  • tcit/funkwhale
  • JocelynDelalande/funkwhale
  • eneiluj/funkwhale
  • reg/funkwhale
  • ButterflyOfFire/funkwhale
  • m4sk1n/funkwhale
  • wxcafe/funkwhale
  • andybalaam/funkwhale
  • jcgruenhage/funkwhale
  • pblayo/funkwhale
  • joshuaboniface/funkwhale
  • n3ddy/funkwhale
  • gegeweb/funkwhale
  • tohojo/funkwhale
  • emillumine/funkwhale
  • Te-k/funkwhale
  • asaintgenis/funkwhale
  • anoadragon453/funkwhale
  • Sakada/funkwhale
  • ilianaw/funkwhale
  • l4p1n/funkwhale
  • pnizet/funkwhale
  • dante383/funkwhale
  • interfect/funkwhale
  • akhardya/funkwhale
  • svfusion/funkwhale
  • noplanman/funkwhale
  • nykopol/funkwhale
  • roipoussiere/funkwhale
  • Von/funkwhale
  • aurieh/funkwhale
  • icaria36/funkwhale
  • floreal/funkwhale
  • paulwalko/funkwhale
  • comradekingu/funkwhale
  • FurryJulie/funkwhale
  • Legolars99/funkwhale
  • Vierkantor/funkwhale
  • zachhats/funkwhale
  • heyjake/funkwhale
  • sn0w/funkwhale
  • jvoisin/funkwhale
  • gordon/funkwhale
  • Alexander/funkwhale
  • bignose/funkwhale
  • qasim.ali/funkwhale
  • fakegit/funkwhale
  • Kxze/funkwhale
  • stenstad/funkwhale
  • creak/funkwhale
  • Kaze/funkwhale
  • Tixie/funkwhale
  • IISergII/funkwhale
  • lfuelling/funkwhale
  • nhaddag/funkwhale
  • yoasif/funkwhale
  • ifischer/funkwhale
  • keslerm/funkwhale
  • flupe/funkwhale
  • petitminion/funkwhale
  • ariasuni/funkwhale
  • ollie/funkwhale
  • ngaumont/funkwhale
  • techknowlogick/funkwhale
  • Shleeble/funkwhale
  • theflyingfrog/funkwhale
  • jonatron/funkwhale
  • neobrain/funkwhale
  • eorn/funkwhale
  • KokaKiwi/funkwhale
  • u1-liquid/funkwhale
  • marzzzello/funkwhale
  • sirenwatcher/funkwhale
  • newer027/funkwhale
  • codl/funkwhale
  • Zwordi/funkwhale
  • gisforgabriel/funkwhale
  • iuriatan/funkwhale
  • simon/funkwhale
  • bheesham/funkwhale
  • zeoses/funkwhale
  • accraze/funkwhale
  • meliurwen/funkwhale
  • divadsn/funkwhale
  • Etua/funkwhale
  • sdrik/funkwhale
  • Soran/funkwhale
  • kuba-orlik/funkwhale
  • cristianvogel/funkwhale
  • Forceu/funkwhale
  • jeff/funkwhale
  • der_scheibenhacker/funkwhale
  • owlnical/funkwhale
  • jovuit/funkwhale
  • SilverFox15/funkwhale
  • phw/funkwhale
  • mayhem/funkwhale
  • sridhar/funkwhale
  • stromlin/funkwhale
  • rrrnld/funkwhale
  • nitaibezerra/funkwhale
  • jaller94/funkwhale
  • pcouy/funkwhale
  • eduxstad/funkwhale
  • codingHahn/funkwhale
  • captain/funkwhale
  • polyedre/funkwhale
  • leishenailong/funkwhale
  • ccritter/funkwhale
  • lnceballosz/funkwhale
  • fpiesche/funkwhale
  • Fanyx/funkwhale
  • markusblogde/funkwhale
  • Firobe/funkwhale
  • devilcius/funkwhale
  • freaktechnik/funkwhale
  • blopware/funkwhale
  • cone/funkwhale
  • thanksd/funkwhale
  • vachan-maker/funkwhale
  • bbenti/funkwhale
  • tarator/funkwhale
  • prplecake/funkwhale
  • DMarzal/funkwhale
  • lullis/funkwhale
  • hanacgr/funkwhale
  • albjeremias/funkwhale
  • xeruf/funkwhale
  • llelite/funkwhale
  • RoiArthurB/funkwhale
  • cloo/funkwhale
  • nztvar/funkwhale
  • Keunes/funkwhale
  • petitminion/funkwhale-petitminion
  • m-idler/funkwhale
  • SkyLeite/funkwhale
140 results
Select Git revision
  • 278-search-browse
  • 303-json-ld
  • 316-ultrasonic
  • 334-don-t-display-an-empty-page-browser
  • 463-user-libraries
  • ButterflyOfFire/funkwhale-patch-1
  • avatar-everywhere
  • build-docker-unprivileged
  • develop
  • master
  • playlist-component
  • 0.1
  • 0.10
  • 0.11
  • 0.12
  • 0.13
  • 0.14
  • 0.14.1
  • 0.14.2
  • 0.15
  • 0.16
  • 0.16.1
  • 0.16.2
  • 0.16.3
  • 0.17
  • 0.2
  • 0.2.1
  • 0.2.2
  • 0.2.3
  • 0.2.4
  • 0.2.5
  • 0.2.6
  • 0.3
  • 0.3.1
  • 0.3.2
  • 0.3.3
  • 0.3.4
  • 0.3.5
  • 0.4
  • 0.5
  • 0.5.1
  • 0.5.2
  • 0.5.3
  • 0.5.4
  • 0.6
  • 0.6.1
  • 0.7
  • 0.8
  • 0.9
  • 0.9.1
50 results
Show changes
Showing
with 2508 additions and 304 deletions
import urllib.parse
from funkwhale_api.common import preferences, utils
from funkwhale_api.federation import models as federation_models
from funkwhale_api.moderation import mrf
@mrf.inbox.register(name="allow_list")
def check_allow_list(payload, **kwargs):
"""
A MRF policy that only works when the moderation__allow_list_enabled
setting is on.
It will extract domain names from the activity ID, actor ID and activity object ID
and discard the activity if any of those domain names isn't on the allow list.
"""
if not preferences.get("moderation__allow_list_enabled"):
raise mrf.Skip("Allow-listing is disabled")
allowed_domains = set(
federation_models.Domain.objects.filter(allowed=True).values_list(
"name", flat=True
)
)
relevant_ids = [
payload.get("actor"),
kwargs.get("sender_id", payload.get("id")),
utils.recursive_getattr(payload, "object.id", permissive=True),
]
relevant_domains = {
domain
for domain in [urllib.parse.urlparse(i).hostname for i in relevant_ids if i]
if domain
}
if relevant_domains - allowed_domains:
raise mrf.Discard(
"These domains are not allowed: {}".format(
", ".join(relevant_domains - allowed_domains)
)
)
import json
import urllib.parse
import persisting_theory
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from rest_framework import serializers
from funkwhale_api.audio import models as audio_models
from funkwhale_api.common import fields as common_fields
from funkwhale_api.common import preferences
from funkwhale_api.federation import models as federation_models
from funkwhale_api.federation import utils as federation_utils
from funkwhale_api.music import models as music_models
from funkwhale_api.playlists import models as playlists_models
from . import models, tasks
class FilteredArtistSerializer(serializers.ModelSerializer):
class Meta:
model = music_models.Artist
fields = ["id", "name"]
class ModerationTargetSerializer(serializers.Serializer):
type = serializers.ChoiceField(choices=["artist"])
id = serializers.CharField()
def to_representation(self, value):
if value["type"] == "artist":
data = FilteredArtistSerializer(value["obj"]).data
data.update({"type": "artist"})
return data
def to_internal_value(self, value):
if value["type"] == "artist":
field = serializers.PrimaryKeyRelatedField(
queryset=music_models.Artist.objects.all()
)
value["obj"] = field.to_internal_value(value["id"])
return value
class UserFilterSerializer(serializers.ModelSerializer):
target = ModerationTargetSerializer()
class Meta:
model = models.UserFilter
fields = ["uuid", "target", "creation_date"]
read_only_fields = ["uuid", "creation_date"]
def validate(self, data):
target = data.pop("target")
if target["type"] == "artist":
data["target_artist"] = target["obj"]
return data
state_serializers = persisting_theory.Registry()
class DescriptionStateMixin:
def get_description(self, o):
if o.description:
return o.description.text
TAGS_FIELD = serializers.ListField(source="get_tags")
@state_serializers.register(name="music.Artist")
class ArtistStateSerializer(DescriptionStateMixin, serializers.ModelSerializer):
tags = TAGS_FIELD
class Meta:
model = music_models.Artist
fields = [
"id",
"name",
"mbid",
"fid",
"creation_date",
"uuid",
"tags",
"content_category",
"description",
]
@state_serializers.register(name="music.ArtistCredit")
class ArtistCreditStateSerializer(DescriptionStateMixin, serializers.ModelSerializer):
artist = ArtistStateSerializer()
class Meta:
model = music_models.ArtistCredit
fields = [
"id",
"credit",
"mbid",
"fid",
"creation_date",
"uuid",
"artist",
"joinphrase",
"index",
]
@state_serializers.register(name="music.Album")
class AlbumStateSerializer(DescriptionStateMixin, serializers.ModelSerializer):
tags = TAGS_FIELD
artist_credit = ArtistCreditStateSerializer(many=True)
class Meta:
model = music_models.Album
fields = [
"id",
"title",
"mbid",
"fid",
"creation_date",
"uuid",
"artist_credit",
"release_date",
"tags",
"description",
]
@state_serializers.register(name="music.Track")
class TrackStateSerializer(DescriptionStateMixin, serializers.ModelSerializer):
tags = TAGS_FIELD
artist_credit = ArtistCreditStateSerializer(many=True)
album = AlbumStateSerializer()
class Meta:
model = music_models.Track
fields = [
"id",
"title",
"mbid",
"fid",
"creation_date",
"uuid",
"artist_credit",
"album",
"disc_number",
"position",
"license",
"copyright",
"tags",
"description",
]
@state_serializers.register(name="music.Library")
class LibraryStateSerializer(serializers.ModelSerializer):
class Meta:
model = music_models.Library
fields = [
"id",
"uuid",
"fid",
"name",
"creation_date",
"privacy_level",
]
@state_serializers.register(name="playlists.Playlist")
class PlaylistStateSerializer(serializers.ModelSerializer):
class Meta:
model = playlists_models.Playlist
fields = ["id", "name", "creation_date", "privacy_level"]
@state_serializers.register(name="federation.Actor")
class ActorStateSerializer(serializers.ModelSerializer):
class Meta:
model = federation_models.Actor
fields = [
"fid",
"name",
"preferred_username",
"full_username",
"summary",
"domain",
"type",
"creation_date",
]
@state_serializers.register(name="audio.Channel")
class ChannelStateSerializer(serializers.ModelSerializer):
rss_url = serializers.CharField(source="get_rss_url")
name = serializers.CharField(source="artist.name")
full_username = serializers.CharField(source="actor.full_username")
domain = serializers.CharField(source="actor.domain_id")
description = serializers.SerializerMethodField()
tags = serializers.ListField(source="artist.get_tags")
content_category = serializers.CharField(source="artist.content_category")
class Meta:
model = audio_models.Channel
fields = [
"uuid",
"name",
"rss_url",
"metadata",
"full_username",
"description",
"domain",
"creation_date",
"tags",
"content_category",
]
def get_description(self, o):
if o.artist.description:
return o.artist.description.text
def get_actor_query(attr, value):
data = federation_utils.get_actor_data_from_username(value)
return federation_utils.get_actor_from_username_data_query(None, data)
def get_target_owner(target):
mapping = {
audio_models.Channel: lambda t: t.attributed_to,
music_models.Artist: lambda t: t.attributed_to,
music_models.Album: lambda t: t.attributed_to,
music_models.Track: lambda t: t.attributed_to,
music_models.Library: lambda t: t.actor,
playlists_models.Playlist: lambda t: t.actor,
federation_models.Actor: lambda t: t,
}
return mapping[target.__class__](target)
TARGET_CONFIG = {
"channel": {
"queryset": audio_models.Channel.objects.all(),
"id_attr": "uuid",
"id_field": serializers.UUIDField(),
},
"artist": {"queryset": music_models.Artist.objects.all()},
"artist_credit": {"queryset": music_models.ArtistCredit.objects.all()},
"album": {"queryset": music_models.Album.objects.all()},
"track": {"queryset": music_models.Track.objects.all()},
"library": {
"queryset": music_models.Library.objects.all(),
"id_attr": "uuid",
"id_field": serializers.UUIDField(),
},
"playlist": {"queryset": playlists_models.Playlist.objects.all()},
"account": {
"queryset": federation_models.Actor.objects.all(),
"id_attr": "full_username",
"id_field": serializers.EmailField(),
"get_query": get_actor_query,
},
}
TARGET_FIELD = common_fields.GenericRelation(TARGET_CONFIG)
def get_target_state(target):
state = {}
target_state_serializer = state_serializers[target._meta.label]
state = target_state_serializer(target).data
# freeze target type/id in JSON so even if the corresponding object is deleted
# we can have the info and display it in the frontend
target_data = TARGET_FIELD.to_representation(target)
state["_target"] = json.loads(json.dumps(target_data, cls=DjangoJSONEncoder))
if "fid" in state:
state["domain"] = urllib.parse.urlparse(state["fid"]).hostname
state["is_local"] = (
state.get("domain", settings.FEDERATION_HOSTNAME)
== settings.FEDERATION_HOSTNAME
)
return state
class ReportSerializer(serializers.ModelSerializer):
target = TARGET_FIELD
class Meta:
model = models.Report
fields = [
"uuid",
"summary",
"creation_date",
"handled_date",
"is_handled",
"submitter_email",
"target",
"type",
]
read_only_fields = ["uuid", "is_handled", "creation_date", "handled_date"]
def validate(self, validated_data):
validated_data = super().validate(validated_data)
submitter = self.context.get("submitter")
if submitter:
# we have an authenticated actor so no need to check further
return validated_data
unauthenticated_report_types = preferences.get(
"moderation__unauthenticated_report_types"
)
if validated_data["type"] not in unauthenticated_report_types:
raise serializers.ValidationError(
"You need an account to submit this report"
)
if not validated_data.get("submitter_email"):
raise serializers.ValidationError(
"You need to provide an e-mail address to submit this report"
)
return validated_data
def create(self, validated_data):
validated_data["target_state"] = get_target_state(validated_data["target"])
validated_data["target_owner"] = get_target_owner(validated_data["target"])
r = super().create(validated_data)
tasks.signals.report_created.send(sender=None, report=r)
return r
import django.dispatch
""" Required argument: report """
report_created = django.dispatch.Signal()
import logging
from django.conf import settings
from django.core import mail
from django.db import transaction
from django.dispatch import receiver
from funkwhale_api.common import channels, preferences, utils
from funkwhale_api.federation import utils as federation_utils
from funkwhale_api.taskapp import celery
from funkwhale_api.users import models as users_models
from . import models, signals
logger = logging.getLogger(__name__)
@receiver(signals.report_created)
def broadcast_report_created(report, **kwargs):
from . import serializers
channels.group_send(
"admin.moderation",
{
"type": "event.send",
"text": "",
"data": {
"type": "report.created",
"report": serializers.ReportSerializer(report).data,
"unresolved_count": models.Report.objects.filter(
is_handled=False
).count(),
},
},
)
@receiver(signals.report_created)
def trigger_moderator_email(report, **kwargs):
if settings.MODERATION_EMAIL_NOTIFICATIONS_ENABLED:
utils.on_commit(send_new_report_email_to_moderators.delay, report_id=report.pk)
def get_moderators():
moderators = users_models.User.objects.filter(
is_active=True, permission_moderation=True
)
if not moderators:
# we fallback on superusers
moderators = users_models.User.objects.filter(is_superuser=True)
moderators = sorted(moderators, key=lambda m: m.pk)
return moderators
@celery.app.task(name="moderation.send_new_report_email_to_moderators")
@celery.require_instance(
models.Report.objects.select_related("submitter").filter(is_handled=False), "report"
)
def send_new_report_email_to_moderators(report):
moderators = get_moderators()
submitter_repr = (
report.submitter.full_username if report.submitter else report.submitter_email
)
subject = "[{} moderation - {}] New report from {}".format(
settings.FUNKWHALE_HOSTNAME, report.get_type_display(), submitter_repr
)
detail_url = federation_utils.full_url(f"/manage/moderation/reports/{report.uuid}")
unresolved_reports_url = federation_utils.full_url(
"/manage/moderation/reports?q=resolved:no"
)
unresolved_reports = models.Report.objects.filter(is_handled=False).count()
body = [
'{} just submitted a report in the "{}" category.'.format(
submitter_repr, report.get_type_display()
),
"",
"Reported object: {} - {}".format(
report.target._meta.verbose_name.title(), str(report.target)
),
]
if hasattr(report.target, "get_absolute_url"):
body.append(
"Open public page: {}".format(
federation_utils.full_url(report.target.get_absolute_url())
)
)
if hasattr(report.target, "get_moderation_url"):
body.append(
"Open moderation page: {}".format(
federation_utils.full_url(report.target.get_moderation_url())
)
)
if report.summary:
body += ["", "Report content:", "", report.summary]
body += [
"",
f"- To handle this report, please visit {detail_url}",
"- To view all unresolved reports (currently {}), please visit {}".format(
unresolved_reports, unresolved_reports_url
),
"",
"",
"",
"You are receiving this e-mail because you are a moderator for {}.".format(
settings.FUNKWHALE_HOSTNAME
),
]
for moderator in moderators:
if not moderator.email:
logger.warning(
"Moderator %s has no e-mail address configured", moderator.username
)
continue
mail.send_mail(
subject,
message="\n".join(body),
recipient_list=[moderator.email],
from_email=settings.DEFAULT_FROM_EMAIL,
)
@celery.app.task(name="moderation.user_request_handle")
@celery.require_instance(
models.UserRequest.objects.select_related("submitter"), "user_request"
)
@transaction.atomic
def user_request_handle(user_request, new_status, old_status=None):
if user_request.status != new_status:
logger.warn(
"User request %s was handled before asynchronous tasks run", user_request.pk
)
return
if user_request.type == "signup" and new_status == "pending" and old_status is None:
notify_mods_signup_request_pending(user_request)
broadcast_user_request_created(user_request)
elif user_request.type == "signup" and new_status == "approved":
user_request.submitter.user.is_active = True
user_request.submitter.user.save(update_fields=["is_active"])
notify_submitter_signup_request_approved(user_request)
elif user_request.type == "signup" and new_status == "refused":
notify_submitter_signup_request_refused(user_request)
def broadcast_user_request_created(user_request):
from funkwhale_api.manage import serializers as manage_serializers
channels.group_send(
"admin.moderation",
{
"type": "event.send",
"text": "",
"data": {
"type": "user_request.created",
"user_request": manage_serializers.ManageUserRequestSerializer(
user_request
).data,
"pending_count": models.UserRequest.objects.filter(
status="pending"
).count(),
},
},
)
def notify_mods_signup_request_pending(obj):
moderators = get_moderators()
submitter_repr = obj.submitter.preferred_username
subject = "[{} moderation] New sign-up request from {}".format(
settings.FUNKWHALE_HOSTNAME, submitter_repr
)
detail_url = federation_utils.full_url(f"/manage/moderation/requests/{obj.uuid}")
unresolved_requests_url = federation_utils.full_url(
"/manage/moderation/requests?q=status:pending"
)
unresolved_requests = models.UserRequest.objects.filter(status="pending").count()
body = [
"{} wants to register on your pod. You need to review their request before they can use the service.".format(
submitter_repr
),
"",
f"- To handle this request, please visit {detail_url}",
"- To view all unresolved requests (currently {}), please visit {}".format(
unresolved_requests, unresolved_requests_url
),
"",
"",
"",
"You are receiving this e-mail because you are a moderator for {}.".format(
settings.FUNKWHALE_HOSTNAME
),
]
for moderator in moderators:
if not moderator.email:
logger.warning(
"Moderator %s has no e-mail address configured", moderator.username
)
continue
mail.send_mail(
subject,
message="\n".join(body),
recipient_list=[moderator.email],
from_email=settings.DEFAULT_FROM_EMAIL,
)
def notify_submitter_signup_request_approved(user_request):
submitter_repr = user_request.submitter.preferred_username
submitter_email = user_request.submitter.user.email
if not submitter_email:
logger.warning("User %s has no e-mail address configured", submitter_repr)
return
subject = f"Welcome to {settings.FUNKWHALE_HOSTNAME}, {submitter_repr}!"
login_url = federation_utils.full_url("/login")
body = [
f"Hi {submitter_repr} and welcome,",
"",
"Our moderation team has approved your account request and you can now start "
"using the service. Please visit {} to get started.".format(login_url),
"",
"Before your first login, you may need to verify your e-mail address if you didn't already.",
]
mail.send_mail(
subject,
message="\n".join(body),
recipient_list=[submitter_email],
from_email=settings.DEFAULT_FROM_EMAIL,
)
def notify_submitter_signup_request_refused(user_request):
submitter_repr = user_request.submitter.preferred_username
submitter_email = user_request.submitter.user.email
if not submitter_email:
logger.warning("User %s has no e-mail address configured", submitter_repr)
return
subject = "Your account request at {} was refused".format(
settings.FUNKWHALE_HOSTNAME
)
body = [
f"Hi {submitter_repr},",
"",
"You recently submitted an account request on our service. However, our "
"moderation team has refused it, and as a result, you won't be able to use "
"the service.",
]
instance_contact_email = preferences.get("instance__contact_email")
if instance_contact_email:
body += [
"",
"If you think this is a mistake, please contact our team at {}.".format(
instance_contact_email
),
]
mail.send_mail(
subject,
message="\n".join(body),
recipient_list=[submitter_email],
from_email=settings.DEFAULT_FROM_EMAIL,
)
from funkwhale_api.common import routers
from . import views
router = routers.OptionalSlashRouter()
router.register(r"content-filters", views.UserFilterViewSet, "content-filters")
router.register(r"reports", views.ReportsViewSet, "reports")
urlpatterns = router.urls
from rest_framework import serializers
from funkwhale_api.federation import models as federation_models
from . import models
from . import serializers as moderation_serializers
NOTE_TARGET_FIELDS = {
"report": {
"queryset": models.Report.objects.all(),
"id_attr": "uuid",
"id_field": serializers.UUIDField(),
},
"request": {
"queryset": models.UserRequest.objects.all(),
"id_attr": "uuid",
"id_field": serializers.UUIDField(),
},
"account": {
"queryset": federation_models.Actor.objects.all(),
"id_attr": "full_username",
"id_field": serializers.EmailField(),
"get_query": moderation_serializers.get_actor_query,
},
}
def get_signup_form_additional_fields_serializer(customization):
fields = (customization or {}).get("fields", []) or []
class AdditionalFieldsSerializer(serializers.Serializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for field in fields:
required = bool(field.get("required", True))
self.fields[field["label"]] = serializers.CharField(
max_length=5000,
required=required,
allow_null=not required,
allow_blank=not required,
)
return AdditionalFieldsSerializer(required=fields, allow_null=not fields)
from django.db import IntegrityError
from rest_framework import mixins, response, status, viewsets
from funkwhale_api.federation import routes
from funkwhale_api.federation import utils as federation_utils
from . import models, serializers
class UserFilterViewSet(
mixins.ListModelMixin,
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet,
):
lookup_field = "uuid"
queryset = (
models.UserFilter.objects.all()
.order_by("-creation_date")
.select_related("target_artist")
)
serializer_class = serializers.UserFilterSerializer
required_scope = "filters"
ordering_fields = ("creation_date",)
def create(self, request, *args, **kwargs):
try:
return super().create(request, *args, **kwargs)
except IntegrityError:
content = {"detail": "A content filter already exists for this object"}
return response.Response(content, status=status.HTTP_400_BAD_REQUEST)
def get_queryset(self):
qs = super().get_queryset()
return qs.filter(user=self.request.user)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class ReportsViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet):
lookup_field = "uuid"
queryset = models.Report.objects.all().order_by("-creation_date")
serializer_class = serializers.ReportSerializer
required_scope = "reports"
ordering_fields = ("creation_date",)
anonymous_policy = "setting"
anonymous_scopes = {"write:reports"}
throttling_scopes = {
"create": {
"anonymous": "anonymous-reports",
"authenticated": "authenticated-reports",
}
}
def get_serializer_context(self):
context = super().get_serializer_context()
if self.request.user.is_authenticated:
context["submitter"] = self.request.user.actor
return context
def perform_create(self, serializer):
submitter = None
if self.request.user.is_authenticated:
submitter = self.request.user.actor
report = serializer.save(submitter=submitter)
forward = self.request.data.get("forward", False)
if (
forward
and report.target
and report.target_owner
and hasattr(report.target, "fid")
and not federation_utils.is_local(report.target.fid)
):
routes.outbox.dispatch({"type": "Flag"}, context={"report": report})
from django.contrib import admin
from funkwhale_api.common import admin
from . import models
@admin.register(models.ArtistCredit)
class ArtistCreditAdmin(admin.ModelAdmin):
list_display = [
"artist",
"credit",
"joinphrase",
"creation_date",
]
search_fields = ["artist__name", "credit"]
@admin.register(models.Artist)
class ArtistAdmin(admin.ModelAdmin):
list_display = ["name", "mbid", "creation_date"]
list_display = ["name", "mbid", "creation_date", "modification_date"]
search_fields = ["name", "mbid"]
@admin.register(models.Album)
class AlbumAdmin(admin.ModelAdmin):
list_display = ["title", "artist", "mbid", "release_date", "creation_date"]
search_fields = ["title", "artist__name", "mbid"]
list_display = ["title", "mbid", "release_date", "creation_date"]
search_fields = ["title", "mbid"]
list_select_related = True
def formfield_for_manytomany(self, db_field, request, **kwargs):
if db_field.name == "artist_credit":
object_id = request.resolver_match.kwargs.get("object_id")
kwargs["queryset"] = models.ArtistCredit.objects.filter(
albums__id=object_id
)
return super().formfield_for_foreignkey(db_field, request, **kwargs)
@admin.register(models.Track)
class TrackAdmin(admin.ModelAdmin):
list_display = ["title", "artist", "album", "mbid"]
search_fields = ["title", "artist__name", "album__title", "mbid"]
list_select_related = True
list_display = ["title", "album", "mbid", "artist"]
search_fields = ["title", "album__title", "mbid"]
def artist(self, obj):
return obj.get_artist_credit_string
def formfield_for_manytomany(self, db_field, request, **kwargs):
if db_field.name == "artist_credit":
object_id = request.resolver_match.kwargs.get("object_id")
kwargs["queryset"] = models.ArtistCredit.objects.filter(
tracks__id=object_id
)
return super().formfield_for_foreignkey(db_field, request, **kwargs)
@admin.register(models.TrackActor)
class TrackActorAdmin(admin.ModelAdmin):
list_display = ["actor", "track", "upload", "internal"]
search_fields = ["actor__preferred_username", "track__name"]
list_select_related = ["actor", "track"]
@admin.register(models.ImportBatch)
......@@ -33,30 +70,14 @@ class ImportBatchAdmin(admin.ModelAdmin):
@admin.register(models.ImportJob)
class ImportJobAdmin(admin.ModelAdmin):
list_display = ["source", "batch", "track_file", "status", "mbid"]
list_select_related = ["track_file", "batch"]
list_display = ["source", "batch", "upload", "status", "mbid"]
list_select_related = ["upload", "batch"]
search_fields = ["source", "batch__pk", "mbid"]
list_filter = ["status"]
@admin.register(models.Work)
class WorkAdmin(admin.ModelAdmin):
list_display = ["title", "mbid", "language", "nature"]
list_select_related = True
search_fields = ["title"]
list_filter = ["language", "nature"]
@admin.register(models.Lyrics)
class LyricsAdmin(admin.ModelAdmin):
list_display = ["url", "id", "url"]
list_select_related = True
search_fields = ["url", "work__title"]
list_filter = ["work__language"]
@admin.register(models.TrackFile)
class TrackFileAdmin(admin.ModelAdmin):
@admin.register(models.Upload)
class UploadAdmin(admin.ModelAdmin):
list_display = [
"track",
"audio_file",
......@@ -65,6 +86,8 @@ class TrackFileAdmin(admin.ModelAdmin):
"mimetype",
"size",
"bitrate",
"import_status",
"library",
]
list_select_related = ["track"]
search_fields = [
......@@ -74,4 +97,70 @@ class TrackFileAdmin(admin.ModelAdmin):
"track__album__title",
"track__artist__name",
]
list_filter = ["mimetype", "import_status", "library__privacy_level"]
def formfield_for_manytomany(self, db_field, request, **kwargs):
if db_field.name == "playlist_libraries":
object_id = request.resolver_match.kwargs.get("object_id")
kwargs["queryset"] = models.Library.objects.filter(
playlist_uploads=object_id
).distinct()
return super().formfield_for_foreignkey(db_field, request, **kwargs)
@admin.register(models.UploadVersion)
class UploadVersionAdmin(admin.ModelAdmin):
list_display = [
"upload",
"audio_file",
"mimetype",
"size",
"bitrate",
"creation_date",
"accessed_date",
]
list_select_related = ["upload"]
search_fields = [
"upload__source",
"upload__acoustid_track_id",
"upload__track__title",
"upload__track__album__title",
"upload__track__artist__name",
]
list_filter = ["mimetype"]
def launch_scan(modeladmin, request, queryset):
for library in queryset:
library.schedule_scan(actor=request.user.actor, force=True)
launch_scan.short_description = "Launch scan"
@admin.register(models.Library)
class LibraryAdmin(admin.ModelAdmin):
list_display = ["id", "name", "actor", "uuid", "privacy_level", "creation_date"]
list_select_related = True
search_fields = ["uuid", "name", "actor__preferred_username"]
list_filter = ["privacy_level"]
actions = [launch_scan]
@admin.register(models.LibraryScan)
class LibraryScanAdmin(admin.ModelAdmin):
list_display = [
"id",
"library",
"actor",
"status",
"creation_date",
"modification_date",
"status",
"total_files",
"processed_files",
"errored_files",
]
list_select_related = True
search_fields = ["actor__username", "library__name"]
list_filter = ["status"]
from django.forms import widgets
from dynamic_preferences import types
from dynamic_preferences.registries import global_preferences_registry
music = types.Section("music")
quality_filters = types.Section("quality_filters")
@global_preferences_registry.register
class MaxTracks(types.BooleanPreference):
show_in_api = True
section = music
name = "transcoding_enabled"
verbose_name = "Transcoding enabled"
help_text = (
"Enable transcoding of audio files in formats requested by the client. "
"This is especially useful for devices that do not support formats "
"such as Flac or Ogg, but the transcoding process will increase the "
"load on the server."
)
default = True
@global_preferences_registry.register
class MusicCacheDuration(types.IntPreference):
show_in_api = True
section = music
name = "transcoding_cache_duration"
default = 60 * 24 * 7
verbose_name = "Transcoding cache duration"
help_text = (
"How many minutes do you want to keep a copy of transcoded tracks "
"on the server? Transcoded files that were not listened in this interval "
"will be erased and retranscoded on the next listening."
)
field_kwargs = {"required": False}
@global_preferences_registry.register
class MbidTaggedContent(types.BooleanPreference):
show_in_api = True
section = music
name = "only_allow_musicbrainz_tagged_files"
verbose_name = "Only allow Musicbrainz tagged files"
help_text = (
"Requires uploaded files to be tagged with a MusicBrainz ID. "
"Enabling this setting has no impact on previously uploaded files. "
"You can use the CLI to clear files that don't contain an MBID or "
"or enable quality filtering to hide untagged content from API calls. "
)
default = False
@global_preferences_registry.register
class MbGenreTags(types.BooleanPreference):
show_in_api = True
section = music
name = "musicbrainz_genre_update"
verbose_name = "Prepopulate tags with MusicBrainz Genre "
help_text = (
"Will trigger a monthly update of the tag table "
"using Musicbrainz genres. Non-existing tag will be created and "
"MusicBrainz Ids will be added to the tags if "
"they match the genre name."
)
default = True
@global_preferences_registry.register
class MbSyncTags(types.BooleanPreference):
show_in_api = True
section = music
name = "sync_musicbrainz_tags"
verbose_name = "Sync MusicBrainz to to funkwhale objects"
help_text = (
"If uploaded files are tagged with a MusicBrainz ID, "
"Funkwhale will query MusicBrainz server to add tags to "
"the track, artist and album objects."
)
default = False
# quality_filters section. Note that the default False is not applied in the fronted
# (the filter will onlyu be use if set to True)
@global_preferences_registry.register
class BitrateFilter(types.ChoicePreference):
show_in_api = True
section = quality_filters
name = "bitrate_filter"
verbose_name = "Upload Quality Filter"
default = "low"
choices = [
("low", "Allow all audio qualities"),
("medium", "Medium : Do not allow low quality"),
("high", "High : only allow high and very-high audio qualities"),
("very_high", "Very High : only allow very-high audio quality"),
]
help_text = (
"The main page content can be filtered based on audio quality. "
"This will exclude lower quality, higher qualities are never excluded. "
"Quality Table can be found in the docs."
)
field_kwargs = {"choices": choices, "required": False}
@global_preferences_registry.register
class HasMbid(types.BooleanPreference):
show_in_api = True
section = quality_filters
name = "has_mbid"
verbose_name = "Musicbrainz Ids filter"
help_text = "Should we filter out metadata without Musicbrainz Ids ?"
default = False
@global_preferences_registry.register
class Format(types.MultipleChoicePreference):
show_in_api = True
section = quality_filters
name = "format"
verbose_name = "Allowed Audio Format"
default = (["aac", "aif", "aiff", "flac", "mp3", "ogg", "opus"],)
choices = [
("ogg", "ogg"),
("opus", "opus"),
("flac", "flac"),
("aif", "aif"),
("aiff", "aiff"),
("aac", "aac"),
("mp3", "mp3"),
]
help_text = "Which audio format to allow"
@global_preferences_registry.register
class AlbumArt(types.BooleanPreference):
show_in_api = True
section = quality_filters
name = "has_cover"
verbose_name = "Album art Filter"
help_text = "Only Albums with a cover will be displayed in the home page"
default = False
@global_preferences_registry.register
class Tags(types.BooleanPreference):
show_in_api = True
section = quality_filters
name = "has_tags"
verbose_name = "Tags Filter"
help_text = "Only content with at least one tag will be displayed"
default = False
@global_preferences_registry.register
class ReleaseDate(types.BooleanPreference):
show_in_api = True
section = quality_filters
name = "has_release_date"
verbose_name = "Release date Filter"
help_text = "Only content with a release date will be displayed"
default = False
@global_preferences_registry.register
class JoinPhrases(types.StringPreference):
show_in_api = True
section = music
name = "join_phrases"
verbose_name = "Join Phrases"
help_text = (
"Used by the artist parser to create multiples artists in case the metadata "
"is a single string. BE WARNED, changing the order or the values can break the parser in unexpected ways. "
"It's MANDATORY to escape dots and to put doted variation before because the first match is used "
r"(example : `|feat\.|ft\.|feat|` and not `feat|feat\.|ft\.|feat`.). ORDER is really important "
"(says an anarchist). To avoid artist duplication and wrongly parsed artist data "
"it's recommended to tag files with Musicbrainz Picard. "
)
default = (
r"featuring | feat\. | ft\. | feat | with | and | & | vs\. | \| | \||\| |\|| , | ,|, |,|"
r" ; | ;|; |;| versus | vs | \( | \(|\( |\(| Remix\) |Remix\) | Remix\)| \) | \)|\) |\)| x |"
"accompanied by | alongside | together with | collaboration with | featuring special guest |"
"joined by | joined with | featuring guest | introducing | accompanied by | performed by | performed with |"
"performed by and | and | featuring | with | presenting | accompanied by | and special guest |"
"featuring special guests | featuring and | featuring & | and featuring "
)
widget = widgets.Textarea
field_kwargs = {"required": False}
@global_preferences_registry.register
class DefaultJoinPhrases(types.StringPreference):
show_in_api = True
section = music
name = "default_join_phrase"
verbose_name = "Default Join Phrase"
help_text = (
"The default join phrase used by artist parser"
"For example: `artists = [artist1, Artist2]` will be displayed has : artist1.name, artis2.name"
"Changing this value will not update already parsed artists"
)
default = ", "
widget = widgets.Textarea
field_kwargs = {"required": False}
import os
from urllib.parse import urlparse
import factory
from django.conf import settings
from funkwhale_api.factories import ManyToManyFromList, registry
from funkwhale_api.federation.factories import LibraryTrackFactory
from funkwhale_api.users.factories import UserFactory
from funkwhale_api.common import factories as common_factories
from funkwhale_api.factories import NoUpdateOnCreate, registry
from funkwhale_api.federation import factories as federation_factories
from funkwhale_api.music import licenses
from funkwhale_api.tags import factories as tags_factories
from funkwhale_api.users import factories as users_factories
SAMPLES_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
......@@ -13,139 +18,257 @@ SAMPLES_PATH = os.path.join(
)
@registry.register
class ArtistFactory(factory.django.DjangoModelFactory):
name = factory.Faker("name")
mbid = factory.Faker("uuid4")
def playable_factory(field):
@factory.post_generation
def inner(self, create, extracted, **kwargs):
if not create:
return
class Meta:
model = "music.Artist"
if extracted:
UploadFactory(
library__privacy_level="everyone",
import_status="finished",
**{field: self}
)
return inner
def deduce_from_conf(field):
@factory.lazy_attribute
def inner(self):
return licenses.LICENSES_BY_ID[self.code][field]
return inner
@registry.register
class AlbumFactory(factory.django.DjangoModelFactory):
title = factory.Faker("sentence", nb_words=3)
mbid = factory.Faker("uuid4")
release_date = factory.Faker("date_object")
cover = factory.django.ImageField()
artist = factory.SubFactory(ArtistFactory)
release_group_id = factory.Faker("uuid4")
class LicenseFactory(NoUpdateOnCreate, factory.django.DjangoModelFactory):
code = "cc-by-4.0"
url = deduce_from_conf("url")
commercial = deduce_from_conf("commercial")
redistribute = deduce_from_conf("redistribute")
copyleft = deduce_from_conf("copyleft")
attribution = deduce_from_conf("attribution")
derivative = deduce_from_conf("derivative")
class Meta:
model = "music.Album"
model = "music.License"
django_get_or_create = ("code",)
@registry.register
class TrackFactory(factory.django.DjangoModelFactory):
title = factory.Faker("sentence", nb_words=3)
class ArtistFactory(
tags_factories.TaggableFactory, NoUpdateOnCreate, factory.django.DjangoModelFactory
):
name = factory.Faker("name")
mbid = factory.Faker("uuid4")
album = factory.SubFactory(AlbumFactory)
artist = factory.SelfAttribute("album.artist")
position = 1
tags = ManyToManyFromList("tags")
fid = factory.Faker("federation_url")
playable = playable_factory("track__album__artist_credit__artist")
class Meta:
model = "music.Track"
model = "music.Artist"
@registry.register
class TrackFileFactory(factory.django.DjangoModelFactory):
track = factory.SubFactory(TrackFactory)
audio_file = factory.django.FileField(
from_path=os.path.join(SAMPLES_PATH, "test.ogg")
class Params:
attributed = factory.Trait(
attributed_to=factory.SubFactory(federation_factories.ActorFactory)
)
local = factory.Trait(fid=factory.Faker("federation_url", local=True))
with_cover = factory.Trait(
attachment_cover=factory.SubFactory(common_factories.AttachmentFactory)
)
bitrate = None
size = None
duration = None
@registry.register
class ArtistCreditFactory(factory.django.DjangoModelFactory):
artist = factory.SubFactory(ArtistFactory)
credit = factory.LazyAttribute(lambda obj: obj.artist.name)
joinphrase = ""
class Meta:
model = "music.TrackFile"
model = "music.ArtistCredit"
class Params:
in_place = factory.Trait(audio_file=None)
federation = factory.Trait(
audio_file=None,
library_track=factory.SubFactory(LibraryTrackFactory),
mimetype=factory.LazyAttribute(lambda o: o.library_track.audio_mimetype),
source=factory.LazyAttribute(lambda o: o.library_track.audio_url),
local = factory.Trait(
artist=factory.SubFactory(ArtistFactory, local=True),
)
@registry.register
class ImportBatchFactory(factory.django.DjangoModelFactory):
submitted_by = factory.SubFactory(UserFactory)
class AlbumFactory(
tags_factories.TaggableFactory, NoUpdateOnCreate, factory.django.DjangoModelFactory
):
title = factory.Faker("sentence", nb_words=3)
mbid = factory.Faker("uuid4")
release_date = factory.Faker("date_object")
release_group_id = factory.Faker("uuid4")
fid = factory.Faker("federation_url")
playable = playable_factory("track__album")
class Meta:
model = "music.ImportBatch"
model = "music.Album"
class Params:
federation = factory.Trait(submitted_by=None, source="federation")
finished = factory.Trait(status="finished")
attributed = factory.Trait(
attributed_to=factory.SubFactory(federation_factories.ActorFactory)
)
local = factory.Trait(
fid=factory.Faker("federation_url", local=True),
)
with_cover = factory.Trait(
attachment_cover=factory.SubFactory(common_factories.AttachmentFactory)
)
@factory.post_generation
def artist_credit(self, create, extracted, **kwargs):
if urlparse(self.fid).netloc == settings.FEDERATION_HOSTNAME:
kwargs["artist__local"] = True
if extracted:
self.artist_credit.add(extracted)
if create:
self.artist_credit.add(ArtistCreditFactory(**kwargs))
@registry.register
class ImportJobFactory(factory.django.DjangoModelFactory):
batch = factory.SubFactory(ImportBatchFactory)
source = factory.Faker("url")
class TrackFactory(
tags_factories.TaggableFactory, NoUpdateOnCreate, factory.django.DjangoModelFactory
):
uuid = factory.Faker("uuid4")
fid = factory.Faker("federation_url")
title = factory.Faker("sentence", nb_words=3)
mbid = factory.Faker("uuid4")
replace_if_duplicate = False
album = factory.SubFactory(AlbumFactory)
position = 1
playable = playable_factory("track")
class Meta:
model = "music.ImportJob"
model = "music.Track"
class Params:
federation = factory.Trait(
mbid=None,
library_track=factory.SubFactory(LibraryTrackFactory),
batch=factory.SubFactory(ImportBatchFactory, federation=True),
)
finished = factory.Trait(
status="finished", track_file=factory.SubFactory(TrackFileFactory)
attributed = factory.Trait(
attributed_to=factory.SubFactory(federation_factories.ActorFactory)
)
in_place = factory.Trait(status="finished", audio_file=None)
with_audio_file = factory.Trait(
status="finished",
audio_file=factory.django.FileField(
from_path=os.path.join(SAMPLES_PATH, "test.ogg")
local = factory.Trait(
fid=factory.Faker(
"federation_url",
local=True,
prefix="/federation/music/tracks",
obj_uuid=factory.SelfAttribute("..uuid"),
),
album__local=True,
)
with_cover = factory.Trait(
attachment_cover=factory.SubFactory(common_factories.AttachmentFactory)
)
@factory.post_generation
def artist_credit(self, created, extracted, **kwargs):
"""
A bit intricated, because we want to be able to specify a different
track artist with a fallback on album artist if nothing is specified.
And handle cases where build or build_batch are used (so no db calls)
"""
# needed to get a primary key on the track and album objects. The primary key is needed for many_to_many
if self.album:
self.album.save()
if not self.pk:
self.save()
if extracted:
self.artist_credit.add(extracted)
elif kwargs:
if created:
self.artist_credit.add(ArtistCreditFactory(**kwargs))
else:
self.artist_credit.add(ArtistCreditFactory.build(**kwargs))
elif self.album:
self.artist_credit.set(self.album.artist_credit.all())
if created:
self.save()
# The @factory.post_generation is not used because we must
# not redefine the builtin `license` function.
def _license_post_generation(self, created, extracted, **kwargs):
if not created:
return
if extracted:
self.license = LicenseFactory(code=extracted)
self.save()
license = factory.PostGeneration(_license_post_generation)
@registry.register(name="music.FileImportJob")
class FileImportJobFactory(ImportJobFactory):
source = "file://"
mbid = None
@registry.register
class UploadFactory(NoUpdateOnCreate, factory.django.DjangoModelFactory):
fid = factory.Faker("federation_url")
track = factory.SubFactory(TrackFactory)
library = factory.SubFactory(federation_factories.MusicLibraryFactory)
audio_file = factory.django.FileField(
from_path=os.path.join(SAMPLES_PATH, "test.ogg")
)
bitrate = 320
size = 320
duration = 320
mimetype = "audio/ogg"
quality = 1
class Meta:
model = "music.Upload"
class Params:
in_place = factory.Trait(audio_file=None, mimetype=None)
playable = factory.Trait(
import_status="finished", library__privacy_level="everyone"
)
local = factory.Trait(
fid=factory.Faker("federation_url", local=True),
track__local=True,
library__local=True,
)
@factory.post_generation
def channel(self, created, extracted, **kwargs):
if not extracted:
return
from funkwhale_api.audio import factories as audio_factories
audio_factories.ChannelFactory(
library=self.library,
artist=self.track.artist_credit.all()[0].artist,
**kwargs
)
@registry.register
class WorkFactory(factory.django.DjangoModelFactory):
mbid = factory.Faker("uuid4")
language = "eng"
nature = "song"
title = factory.Faker("sentence", nb_words=3)
class UploadVersionFactory(NoUpdateOnCreate, factory.django.DjangoModelFactory):
upload = factory.SubFactory(UploadFactory, bitrate=200000)
bitrate = factory.SelfAttribute("upload.bitrate")
mimetype = "audio/mpeg"
audio_file = factory.django.FileField()
size = 2000000
class Meta:
model = "music.Work"
model = "music.UploadVersion"
@registry.register
class LyricsFactory(factory.django.DjangoModelFactory):
work = factory.SubFactory(WorkFactory)
url = factory.Faker("url")
content = factory.Faker("paragraphs", nb=4)
class ImportBatchFactory(NoUpdateOnCreate, factory.django.DjangoModelFactory):
submitted_by = factory.SubFactory(users_factories.UserFactory)
class Meta:
model = "music.Lyrics"
model = "music.ImportBatch"
@registry.register
class TagFactory(factory.django.DjangoModelFactory):
name = factory.SelfAttribute("slug")
slug = factory.Faker("slug")
class ImportJobFactory(NoUpdateOnCreate, factory.django.DjangoModelFactory):
batch = factory.SubFactory(ImportBatchFactory)
source = factory.Faker("url")
mbid = factory.Faker("uuid4")
replace_if_duplicate = False
class Meta:
model = "taggit.Tag"
model = "music.ImportJob"
"""
Populates the database with fake data
"""
import logging
import random
from funkwhale_api.music import factories
from funkwhale_api.audio import factories as audio_factories
from funkwhale_api.cli import users
from funkwhale_api.favorites import factories as favorites_factories
from funkwhale_api.federation import factories as federation_factories
from funkwhale_api.history import factories as history_factories
from funkwhale_api.music import factories as music_factories
from funkwhale_api.playlists import factories as playlist_factories
from funkwhale_api.users import models, serializers
logger = logging.getLogger(__name__)
def create_data(super_user_name=None):
super_user = None
if super_user_name:
try:
super_user = users.handler_create_user(
username=str(super_user_name),
password="funkwhale",
email=f"{super_user_name}eat@the.rich",
is_superuser=True,
is_staff=True,
upload_quota=None,
)
except serializers.ValidationError as e:
for field, errors in e.detail.items():
if (
"A user with that username already exists"
or "A user is already registered with this e-mail address"
in errors[0]
):
print(
f"Superuser {super_user_name} already in db. Skipping superuser creation"
)
super_user = models.User.objects.get(username=super_user_name)
continue
else:
raise e
print(f"Superuser with username {super_user_name} and password `funkwhale`")
library = federation_factories.MusicLibraryFactory(
actor=(super_user.actor if super_user else federation_factories.ActorFactory()),
local=True if super_user else False,
)
uploads = music_factories.UploadFactory.create_batch(
size=random.randint(3, 18),
playable=True,
library=library,
local=True,
)
for upload in uploads[:2]:
history_factories.ListeningFactory(
track=upload.track, actor=upload.library.actor
)
favorites_factories.TrackFavorite(
track=upload.track, actor=upload.library.actor
)
print("Created fid", upload.track.fid)
playlist = playlist_factories.PlaylistFactory(
name="playlist test public",
privacy_level="everyone",
local=True if super_user else False,
actor=(super_user.actor if super_user else federation_factories.ActorFactory()),
)
playlist_factories.PlaylistTrackFactory(playlist=playlist, track=upload.track)
federation_factories.LibraryFollowFactory.create_batch(
size=random.randint(3, 18), actor=super_user.actor
)
def create_data(count=25):
artists = factories.ArtistFactory.create_batch(size=count)
for artist in artists:
print("Creating data for", artist)
albums = factories.AlbumFactory.create_batch(
artist=artist, size=random.randint(1, 5)
# my podcast
my_podcast_library = federation_factories.MusicLibraryFactory(
actor=(super_user.actor if super_user else federation_factories.ActorFactory()),
local=True,
)
my_podcast_channel = audio_factories.ChannelFactory(
library=my_podcast_library,
attributed_to=super_user.actor,
artist__content_category="podcast",
)
my_podcast_channel_serie = music_factories.AlbumFactory(
artist_credit__artist=my_podcast_channel.artist
)
for album in albums:
factories.TrackFileFactory.create_batch(
track__album=album, size=random.randint(3, 18)
music_factories.TrackFactory.create_batch(
size=random.randint(3, 6),
artist_credit__artist=my_podcast_channel.artist,
album=my_podcast_channel_serie,
)
# podcast
podcast_channel = audio_factories.ChannelFactory(artist__content_category="podcast")
podcast_channel_serie = music_factories.AlbumFactory(
artist_credit__artist=podcast_channel.artist
)
music_factories.TrackFactory.create_batch(
size=random.randint(3, 6),
artist_credit__artist=podcast_channel.artist,
album=podcast_channel_serie,
)
audio_factories.SubscriptionFactory(
approved=True, target=podcast_channel.actor, actor=super_user.actor
)
# my artist channel
my_artist_library = federation_factories.MusicLibraryFactory(
actor=(super_user.actor if super_user else federation_factories.ActorFactory()),
local=True if super_user else False,
)
my_artist_channel = audio_factories.ChannelFactory(
library=my_artist_library,
attributed_to=super_user.actor,
artist__content_category="music",
)
my_artist_channel_serie = music_factories.AlbumFactory(
artist_credit__artist=my_artist_channel.artist
)
music_factories.TrackFactory.create_batch(
size=random.randint(3, 6),
artist_credit__artist=my_artist_channel.artist,
album=my_artist_channel_serie,
)
# artist channel
artist_channel = audio_factories.ChannelFactory(artist__content_category="artist")
artist_channel_serie = music_factories.AlbumFactory(
artist_credit__artist=artist_channel.artist
)
music_factories.TrackFactory.create_batch(
size=random.randint(3, 6),
artist_credit__artist=artist_channel.artist,
album=artist_channel_serie,
)
audio_factories.SubscriptionFactory(
approved=True, target=artist_channel.actor, actor=super_user.actor
)
......
from django.db.models import Count
import django_filters
from django.db.models import Q
from django_filters import rest_framework as filters
from funkwhale_api.audio import filters as audio_filters
from funkwhale_api.audio import models as audio_models
from funkwhale_api.common import fields
from funkwhale_api.common import filters as common_filters
from funkwhale_api.common import search
from funkwhale_api.moderation import filters as moderation_filters
from funkwhale_api.tags import filters as tags_filters
from . import models
from . import models, utils
class ArtistFilter(filters.FilterSet):
q = fields.SearchFilter(search_fields=["name"])
listenable = filters.BooleanFilter(name="_", method="filter_listenable")
def filter_tags(queryset, name, value):
non_empty_tags = [v.lower() for v in value if v]
for tag in non_empty_tags:
queryset = queryset.filter(tagged_items__tag__name__iexact=tag).distinct()
return queryset
TAG_FILTER = common_filters.MultipleQueryFilter(method=filter_tags)
class RelatedFilterSet(filters.FilterSet):
related_type = int
related_field = "pk"
related = filters.CharFilter(field_name="_", method="filter_related")
def filter_related(self, queryset, name, value):
if not value:
return queryset.none()
try:
pk = self.related_type(value)
except (TypeError, ValueError):
return queryset.none()
try:
obj = queryset.model.objects.get(**{self.related_field: pk})
except queryset.model.DoesNotExist:
return queryset.none()
queryset = queryset.exclude(pk=obj.pk)
return tags_filters.get_by_similar_tags(queryset, obj.get_tags())
class ChannelFilterSet(filters.FilterSet):
channel = filters.CharFilter(field_name="_", method="filter_channel")
def filter_channel(self, queryset, name, value):
if not value:
return queryset
channel = (
audio_models.Channel.objects.filter(uuid=value)
.select_related("library")
.first()
)
if not channel:
return queryset.none()
uploads = models.Upload.objects.filter(library=channel.library)
actor = utils.get_actor_from_request(self.request)
uploads = uploads.playable_by(actor)
ids = uploads.values_list(self.Meta.channel_filter_field, flat=True)
return queryset.filter(pk__in=ids).distinct()
class LibraryFilterSet(filters.FilterSet):
library = filters.CharFilter(field_name="_", method="filter_library")
def filter_library(self, queryset, name, value):
if not value:
return queryset
actor = utils.get_actor_from_request(self.request)
library = models.Library.objects.filter(uuid=value).viewable_by(actor).first()
if not library:
return queryset.none()
uploads = models.Upload.objects.filter(library=library)
uploads = uploads.playable_by(actor)
ids = uploads.values_list(self.Meta.library_filter_field, flat=True)
qs = queryset.filter(pk__in=ids).distinct()
return qs
class ArtistFilter(
RelatedFilterSet,
LibraryFilterSet,
audio_filters.IncludeChannelsFilterSet,
moderation_filters.HiddenContentFilterSet,
):
q = fields.SearchFilter(search_fields=["name"], fts_search_fields=["body_text"])
playable = filters.BooleanFilter(field_name="_", method="filter_playable")
has_albums = filters.BooleanFilter(field_name="_", method="filter_has_albums")
tag = TAG_FILTER
content_category = filters.CharFilter("content_category")
scope = common_filters.ActorScopeFilter(
actor_field="artist_credit__tracks__uploads__library__actor",
distinct=True,
library_field="artist_credit__tracks__uploads__library",
)
ordering = common_filters.CaseInsensitiveNameOrderingFilter(
fields=(
("id", "id"),
("name", "name"),
("creation_date", "creation_date"),
("modification_date", "modification_date"),
("?", "random"),
("tag_matches", "related"),
)
)
has_mbid = filters.BooleanFilter(
field_name="_",
method="filter_has_mbid",
)
class Meta:
model = models.Artist
fields = {
"name": ["exact", "iexact", "startswith", "icontains"],
"listenable": "exact",
"mbid": ["exact"],
}
hidden_content_fields_mapping = moderation_filters.USER_FILTER_CONFIG["ARTIST"]
include_channels_field = "channel"
library_filter_field = "track__artist_credit__artist"
def filter_playable(self, queryset, name, value):
actor = utils.get_actor_from_request(self.request)
return queryset.playable_by(actor, value).distinct()
def filter_has_albums(self, queryset, name, value):
return queryset.filter(artist_credit__albums__isnull=not value)
def filter_listenable(self, queryset, name, value):
queryset = queryset.annotate(files_count=Count("albums__tracks__files"))
if value:
return queryset.filter(files_count__gt=0)
else:
return queryset.filter(files_count=0)
def filter_has_mbid(self, queryset, name, value):
return queryset.filter(mbid__isnull=(not value))
class TrackFilter(filters.FilterSet):
q = fields.SearchFilter(search_fields=["title", "album__title", "artist__name"])
listenable = filters.BooleanFilter(name="_", method="filter_listenable")
class TrackFilter(
RelatedFilterSet,
ChannelFilterSet,
LibraryFilterSet,
audio_filters.IncludeChannelsFilterSet,
moderation_filters.HiddenContentFilterSet,
):
q = fields.SearchFilter(
search_fields=[
"title",
"album__title",
"artist_credit__artist__name",
],
fts_search_fields=[
"body_text",
"artist_credit__artist__body_text",
"album__body_text",
],
)
playable = filters.BooleanFilter(field_name="_", method="filter_playable")
tag = TAG_FILTER
id = common_filters.MultipleQueryFilter(coerce=int)
scope = common_filters.ActorScopeFilter(
actor_field="uploads__library__actor",
library_field="uploads__library",
distinct=True,
)
artist = filters.ModelChoiceFilter(
field_name="_", method="filter_artist", queryset=models.Artist.objects.all()
)
ordering = django_filters.OrderingFilter(
fields=(
("creation_date", "creation_date"),
("title", "title"),
("album__title", "album__title"),
("album__release_date", "album__release_date"),
("size", "size"),
("position", "position"),
("disc_number", "disc_number"),
("artist_credit__artist__name", "artist_credit__artist__name"),
(
"artist_credit__artist__modification_date",
"artist_credit__artist__modification_date",
),
("?", "random"),
("tag_matches", "related"),
)
)
format = filters.CharFilter(
field_name="_",
method="filter_format",
)
has_mbid = filters.BooleanFilter(
field_name="_",
method="filter_has_mbid",
)
quality_choices = [(0, "low"), (1, "medium"), (2, "high"), (3, "very_high")]
quality = filters.ChoiceFilter(
choices=quality_choices,
method="filter_quality",
)
class Meta:
model = models.Track
fields = {
"title": ["exact", "iexact", "startswith", "icontains"],
"listenable": ["exact"],
"artist": ["exact"],
"id": ["exact"],
"album": ["exact"],
"license": ["exact"],
"mbid": ["exact"],
}
hidden_content_fields_mapping = moderation_filters.USER_FILTER_CONFIG["TRACK"]
include_channels_field = "artist_credit__artist__channel"
channel_filter_field = "track"
library_filter_field = "track"
artist_credit_filter_field = "artist__credit__artist"
def filter_listenable(self, queryset, name, value):
queryset = queryset.annotate(files_count=Count("files"))
if value:
return queryset.filter(files_count__gt=0)
else:
return queryset.filter(files_count=0)
def filter_playable(self, queryset, name, value):
actor = utils.get_actor_from_request(self.request)
return queryset.playable_by(actor, value).distinct()
def filter_artist(self, queryset, name, value):
return queryset.filter(
Q(artist_credit__artist=value) | Q(album__artist_credit__artist=value)
)
class ImportBatchFilter(filters.FilterSet):
q = fields.SearchFilter(search_fields=["submitted_by__username", "source"])
def filter_format(self, queryset, name, value):
mimetypes = [utils.get_type_from_ext(e) for e in value.split(",")]
return queryset.filter(uploads__mimetype__in=mimetypes)
class Meta:
model = models.ImportBatch
fields = {"status": ["exact"], "source": ["exact"], "submitted_by": ["exact"]}
def filter_has_mbid(self, queryset, name, value):
return queryset.filter(mbid__isnull=(not value))
def filter_quality(self, queryset, name, value):
if value == "low":
return queryset.filter(upload__quality__gte=0)
if value == "medium":
return queryset.filter(upload__quality__gte=1)
if value == "high":
return queryset.filter(upload__quality__gte=2)
if value == "very-high":
return queryset.filter(upload__quality=3)
class ImportJobFilter(filters.FilterSet):
q = fields.SearchFilter(search_fields=["batch__submitted_by__username", "source"])
class UploadFilter(audio_filters.IncludeChannelsFilterSet):
library = filters.CharFilter("library__uuid")
channel = filters.CharFilter("library__channel__uuid")
track = filters.UUIDFilter("track__uuid")
track_artist = filters.UUIDFilter("track__artist_credit__artist__uuid")
album_artist = filters.UUIDFilter("track__album__artist_credit__artist__uuid")
library = filters.UUIDFilter("library__uuid")
playable = filters.BooleanFilter(field_name="_", method="filter_playable")
scope = common_filters.ActorScopeFilter(
actor_field="library__actor",
distinct=True,
library_field="library",
)
import_status = common_filters.MultipleQueryFilter(coerce=str, distinct=False)
q = fields.SmartSearchFilter(
config=search.SearchConfig(
search_fields={
"track_artist": {"to": "track__artist__name"},
"album_artist": {"to": "track__album__artist__name"},
"album": {"to": "track__album__title"},
"title": {"to": "track__title"},
},
filter_fields={
"artist": {"to": "track__artist__name__iexact"},
"mimetype": {"to": "mimetype"},
"album": {"to": "track__album__title__iexact"},
"title": {"to": "track__title__iexact"},
"status": {"to": "import_status"},
},
)
)
class Meta:
model = models.ImportJob
fields = {
"batch": ["exact"],
"batch__status": ["exact"],
"batch__source": ["exact"],
"batch__submitted_by": ["exact"],
"status": ["exact"],
"source": ["exact"],
}
model = models.Upload
fields = [
"import_status",
"mimetype",
"import_reference",
]
include_channels_field = "track__artist_credit__artist__channel"
def filter_playable(self, queryset, name, value):
actor = utils.get_actor_from_request(self.request)
return queryset.playable_by(actor, value)
class AlbumFilter(
RelatedFilterSet,
ChannelFilterSet,
LibraryFilterSet,
audio_filters.IncludeChannelsFilterSet,
moderation_filters.HiddenContentFilterSet,
):
playable = filters.BooleanFilter(field_name="_", method="filter_playable")
q = fields.SearchFilter(
search_fields=["title", "artist_credit__artist__name"],
fts_search_fields=["body_text", "artist_credit__artist__body_text"],
)
content_category = filters.CharFilter("artist_credit__artist__content_category")
tag = TAG_FILTER
scope = common_filters.ActorScopeFilter(
actor_field="tracks__uploads__library__actor",
distinct=True,
library_field="tracks__uploads__library",
)
ordering = django_filters.OrderingFilter(
fields=(
("creation_date", "creation_date"),
("release_date", "release_date"),
("title", "title"),
(
"artist_credit__artist__modification_date",
"artist_credit__artist__modification_date",
),
("?", "random"),
("tag_matches", "related"),
)
)
has_tags = filters.BooleanFilter(
field_name="_",
method="filter_has_tags",
)
has_mbid = filters.BooleanFilter(
field_name="_",
method="filter_has_mbid",
)
has_cover = filters.BooleanFilter(
field_name="_",
method="filter_has_cover",
)
class AlbumFilter(filters.FilterSet):
listenable = filters.BooleanFilter(name="_", method="filter_listenable")
q = fields.SearchFilter(search_fields=["title", "artist__name" "source"])
has_release_date = filters.BooleanFilter(
field_name="_", method="filter_has_release_date"
)
artist = filters.ModelChoiceFilter(
field_name="_", method="filter_artist", queryset=models.Artist.objects.all()
)
class Meta:
model = models.Album
fields = ["listenable", "q", "artist"]
def filter_listenable(self, queryset, name, value):
queryset = queryset.annotate(files_count=Count("tracks__files"))
if value:
return queryset.filter(files_count__gt=0)
else:
return queryset.filter(files_count=0)
fields = ["artist_credit", "mbid"]
hidden_content_fields_mapping = moderation_filters.USER_FILTER_CONFIG["ALBUM"]
include_channels_field = "artist_credit__artist__channel"
channel_filter_field = "track__album"
library_filter_field = "track__album"
def filter_playable(self, queryset, name, value):
actor = utils.get_actor_from_request(self.request)
return queryset.playable_by(actor, value)
def filter_has_tags(self, queryset, name, value):
return queryset.filter(tagged_items__isnull=(not value))
def filter_has_mbid(self, queryset, name, value):
return queryset.filter(mbid__isnull=(not value))
def filter_has_cover(self, queryset, name, value):
return queryset.filter(attachment_cover__isnull=(not value))
def filter_has_release_date(self, queryset, name, value):
return queryset.filter(release_date__isnull=(not value))
def filter_artist(self, queryset, name, value):
return queryset.filter(artist_credit__artist=value)
class LibraryFilter(filters.FilterSet):
q = fields.SearchFilter(
search_fields=["name"],
)
scope = common_filters.ActorScopeFilter(
actor_field="actor",
distinct=True,
library_field="pk",
)
actor = filters.CharFilter(method="filter_actor")
class Meta:
model = models.Library
fields = ["privacy_level"]
def filter_actor(self, queryset, name, value):
# supports username or username@domain
if "@" in value:
username, domain = value.split("@", 1)
return queryset.filter(
actor__preferred_username=username,
actor__domain_id=domain,
)
return queryset.filter(actor__preferred_username=value)
......@@ -6,23 +6,26 @@ def load(model, *args, **kwargs):
EXCLUDE_VALIDATION = {"Track": ["artist"]}
class Importer(object):
class Importer:
def __init__(self, model):
self.model = model
def load(self, cleaned_data, raw_data, import_hooks):
mbid = cleaned_data.pop("mbid")
artists_credits = cleaned_data.pop("artist_credit", None)
# let's validate data, just in case
instance = self.model(**cleaned_data)
exclude = EXCLUDE_VALIDATION.get(self.model.__name__, [])
instance.full_clean(exclude=["mbid", "uuid"] + exclude)
instance.full_clean(exclude=["mbid", "uuid", "fid", "from_activity"] + exclude)
m = self.model.objects.update_or_create(mbid=mbid, defaults=cleaned_data)[0]
if artists_credits:
m.artist_credit.set(artists_credits)
for hook in import_hooks:
hook(m, cleaned_data, raw_data)
return m
class Mapping(object):
class Mapping:
"""Cast musicbrainz data to funkwhale data and vice-versa"""
def __init__(self, musicbrainz_mapping):
......@@ -47,4 +50,9 @@ class Mapping(object):
)
registry = {"Artist": Importer, "Track": Importer, "Album": Importer, "Work": Importer}
registry = {
"Artist": Importer,
"ArtistCredit": Importer,
"Track": Importer,
"Album": Importer,
}
import logging
import re
from django.db import transaction
from . import models
logger = logging.getLogger(__name__)
MODEL_FIELDS = [
"redistribute",
"derivative",
"attribution",
"copyleft",
"commercial",
"url",
]
@transaction.atomic
def load(data):
"""
Load/update database objects with our hardcoded data
"""
existing = models.License.objects.all()
existing_by_code = {e.code: e for e in existing}
to_create = []
for row in data:
try:
license_ = existing_by_code[row["code"]]
except KeyError:
logger.debug("Loading new license: {}".format(row["code"]))
to_create.append(
models.License(code=row["code"], **{f: row[f] for f in MODEL_FIELDS})
)
else:
logger.debug("Updating license: {}".format(row["code"]))
stored = [getattr(license_, f) for f in MODEL_FIELDS]
wanted = [row[f] for f in MODEL_FIELDS]
if wanted == stored:
continue
# the object in database needs an update
for f in MODEL_FIELDS:
setattr(license_, f, row[f])
license_.save()
models.License.objects.bulk_create(to_create)
return sorted(models.License.objects.all(), key=lambda o: o.code)
_cache = None
def match(*values):
"""
Given a string, extracted from music file tags, return corresponding License
instance, if found
"""
global _cache
for value in values:
if not value:
continue
# we are looking for the first url in our value
# This regex is not perfect, but it's good enough for now
urls = re.findall(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+",
value,
)
if not urls:
logger.debug(f'Impossible to guess license from string "{value}"')
continue
url = urls[0]
if _cache:
existing = _cache
else:
existing = load(LICENSES)
_cache = existing
for license_ in existing:
if license_.conf is None:
continue
for i in license_.conf["identifiers"]:
if match_urls(url, i):
return license_
def match_urls(*urls):
"""
We want to ensure the two url match but don't care for protocol
or trailing slashes
"""
urls = [u.rstrip("/") for u in urls]
urls = [u.lstrip("http://") for u in urls]
urls = [u.lstrip("https://") for u in urls]
return len(set(urls)) == 1
def get_cc_license(version, perks, country=None, country_name=None):
if len(perks) == 0:
raise ValueError("No perks!")
url_template = "//creativecommons.org/licenses/{type}/{version}/"
code_parts = []
name_parts = []
perks_data = [
("by", "Attribution"),
("nc", "NonCommercial"),
("sa", "ShareAlike"),
("nd", "NoDerivatives"),
]
for perk, name in perks_data:
if perk in perks:
code_parts.append(perk)
name_parts.append(name)
url = url_template.format(version=version, type="-".join(code_parts))
code_parts.append(version)
name = "Creative commons - {perks} {version}".format(
perks="-".join(name_parts), version=version
)
if country:
code_parts.append(country)
name += f" {country_name}"
url += country + "/"
data = {
"name": name,
"code": "cc-{}".format("-".join(code_parts)),
"redistribute": True,
"commercial": "nc" not in perks,
"derivative": "nd" not in perks,
"copyleft": "sa" in perks,
"attribution": "by" in perks,
"url": "https:" + url,
"identifiers": ["http:" + url],
}
return data
COUNTRIES = {
"ar": "Argentina",
"au": "Australia",
"at": "Austria",
"be": "Belgium",
"br": "Brazil",
"bg": "Bulgaria",
"ca": "Canada",
"cl": "Chile",
"cn": "China Mainland",
"co": "Colombia",
"cr": "Costa Rica",
"hr": "Croatia",
"cz": "Czech Republic",
"dk": "Denmark",
"ec": "Ecuador",
"eg": "Egypt",
"ee": "Estonia",
"fi": "Finland",
"fr": "France",
"de": "Germany",
"gr": "Greece",
"gt": "Guatemala",
"hk": "Hong Kong",
"hu": "Hungary",
"igo": "IGO",
"in": "India",
"ie": "Ireland",
"il": "Israel",
"it": "Italy",
"jp": "Japan",
"lu": "Luxembourg",
"mk": "Macedonia",
"my": "Malaysia",
"mt": "Malta",
"mx": "Mexico",
"nl": "Netherlands",
"nz": "New Zealand",
"no": "Norway",
"pe": "Peru",
"ph": "Philippines",
"pl": "Poland",
"pt": "Portugal",
"pr": "Puerto Rico",
"ro": "Romania",
"rs": "Serbia",
"sg": "Singapore",
"si": "Slovenia",
"za": "South Africa",
"kr": "South Korea",
"es": "Spain",
"se": "Sweden",
"ch": "Switzerland",
"tw": "Taiwan",
"th": "Thailand",
"uk": "UK: England & Wales",
"scotland": "UK: Scotland",
"ug": "Uganda",
"us": "United States",
"ve": "Venezuela",
"vn": "Vietnam",
}
CC_30_COUNTRIES = [
"at",
"au",
"br",
"ch",
"cl",
"cn",
"cr",
"cz",
"de",
"ec",
"ee",
"eg",
"es",
"fr",
"gr",
"gt",
"hk",
"hr",
"ie",
"igo",
"it",
"lu",
"nl",
"no",
"nz",
"ph",
"pl",
"pr",
"pt",
"ro",
"rs",
"sg",
"th",
"tw",
"ug",
"us",
"ve",
"vn",
"za",
]
CC_25_COUNTRIES = [
"ar",
"bg",
"ca",
"co",
"dk",
"hu",
"il",
"in",
"mk",
"mt",
"mx",
"my",
"pe",
"scotland",
]
LICENSES = [
# a non-exhaustive list: http://musique-libre.org/doc/le-tableau-des-licences-libres-et-ouvertes-de-dogmazic/
{
"code": "cc0-1.0",
"name": "CC0 - Public domain",
"redistribute": True,
"derivative": True,
"commercial": True,
"attribution": False,
"copyleft": False,
"url": "https://creativecommons.org/publicdomain/zero/1.0/",
"identifiers": [
# note the http here.
# This is the kind of URL that is embedded in music files metadata
"http://creativecommons.org/publicdomain/zero/1.0/"
],
},
{
"code": "LAL-1.3",
"name": "Licence Art Libre 1.3",
"redistribute": True,
"derivative": True,
"commercial": True,
"attribution": True,
"copyleft": True,
"url": "https://artlibre.org/licence/lal",
"identifiers": ["http://artlibre.org/licence/lal"],
},
# Creative commons version 4.0
get_cc_license(version="4.0", perks=["by"]),
get_cc_license(version="4.0", perks=["by", "sa"]),
get_cc_license(version="4.0", perks=["by", "nc"]),
get_cc_license(version="4.0", perks=["by", "nc", "sa"]),
get_cc_license(version="4.0", perks=["by", "nc", "nd"]),
get_cc_license(version="4.0", perks=["by", "nd"]),
# Creative commons version 3.0
get_cc_license(version="3.0", perks=["by"]),
get_cc_license(version="3.0", perks=["by", "sa"]),
get_cc_license(version="3.0", perks=["by", "nc"]),
get_cc_license(version="3.0", perks=["by", "nc", "sa"]),
get_cc_license(version="3.0", perks=["by", "nc", "nd"]),
get_cc_license(version="3.0", perks=["by", "nd"]),
# Creative commons version 2.5
get_cc_license(version="2.5", perks=["by"]),
get_cc_license(version="2.5", perks=["by", "sa"]),
get_cc_license(version="2.5", perks=["by", "nc"]),
get_cc_license(version="2.5", perks=["by", "nc", "sa"]),
get_cc_license(version="2.5", perks=["by", "nc", "nd"]),
get_cc_license(version="2.5", perks=["by", "nd"]),
# Creative commons version 2.0
get_cc_license(version="2.0", perks=["by"]),
get_cc_license(version="2.0", perks=["by", "sa"]),
get_cc_license(version="2.0", perks=["by", "nc"]),
get_cc_license(version="2.0", perks=["by", "nc", "sa"]),
get_cc_license(version="2.0", perks=["by", "nc", "nd"]),
get_cc_license(version="2.0", perks=["by", "nd"]),
# Creative commons version 1.0
get_cc_license(version="1.0", perks=["by"]),
get_cc_license(version="1.0", perks=["by", "sa"]),
get_cc_license(version="1.0", perks=["by", "nc"]),
get_cc_license(version="1.0", perks=["by", "nc", "sa"]),
get_cc_license(version="1.0", perks=["by", "nc", "nd"]),
get_cc_license(version="1.0", perks=["by", "nd"]),
]
# generate ported (by country) CC licenses:
for country in CC_30_COUNTRIES:
name = COUNTRIES[country]
LICENSES += [
get_cc_license(version="3.0", perks=["by"], country=country, country_name=name),
get_cc_license(
version="3.0", perks=["by", "sa"], country=country, country_name=name
),
get_cc_license(
version="3.0", perks=["by", "nc"], country=country, country_name=name
),
get_cc_license(
version="3.0", perks=["by", "nc", "sa"], country=country, country_name=name
),
get_cc_license(
version="3.0", perks=["by", "nc", "nd"], country=country, country_name=name
),
get_cc_license(
version="3.0", perks=["by", "nd"], country=country, country_name=name
),
]
for country in CC_25_COUNTRIES:
name = COUNTRIES[country]
LICENSES += [
get_cc_license(version="2.5", perks=["by"], country=country, country_name=name),
get_cc_license(
version="2.5", perks=["by", "sa"], country=country, country_name=name
),
get_cc_license(
version="2.5", perks=["by", "nc"], country=country, country_name=name
),
get_cc_license(
version="2.5", perks=["by", "nc", "sa"], country=country, country_name=name
),
get_cc_license(
version="2.5", perks=["by", "nc", "nd"], country=country, country_name=name
),
get_cc_license(
version="2.5", perks=["by", "nd"], country=country, country_name=name
),
]
LICENSES = sorted(LICENSES, key=lambda l: l["code"])
LICENSES_BY_ID = {l["code"]: l for l in LICENSES}
import urllib.request
from bs4 import BeautifulSoup
def _get_html(url):
with urllib.request.urlopen(url) as response:
html = response.read()
return html.decode("utf-8")
def extract_content(html):
soup = BeautifulSoup(html, "html.parser")
return soup.find_all("div", class_="lyricbox")[0].contents
def clean_content(contents):
final_content = ""
for e in contents:
if e == "\n":
continue
if e.name == "script":
continue
if e.name == "br":
final_content += "\n"
continue
try:
final_content += e.text
except AttributeError:
final_content += str(e)
return final_content
import os
from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand
from django.db import transaction
from funkwhale_api.music import models
def progress(buffer, count, total, status=""):
bar_len = 60
filled_len = int(round(bar_len * count / float(total)))
bar = "=" * filled_len + "-" * (bar_len - filled_len)
buffer.write(f"[{bar}] {count}/{total} ...{status}\r")
buffer.flush()
class Command(BaseCommand):
help = """
Loop through all in-place imported files in the database, and verify
that the corresponding files are present on the filesystem. If some files are not
found and --no-dry-run is specified, the corresponding database objects will be deleted.
"""
def create_parser(self, *args, **kwargs):
parser = super().create_parser(*args, **kwargs)
parser.formatter_class = RawTextHelpFormatter
return parser
def add_arguments(self, parser):
parser.add_argument(
"--no-dry-run",
action="store_false",
dest="dry_run",
default=True,
help="Disable dry run mode and apply pruning for real on the database",
)
@transaction.atomic
def handle(self, *args, **options):
candidates = models.Upload.objects.filter(source__startswith="file://")
candidates = candidates.filter(audio_file__in=["", None])
total = candidates.count()
self.stdout.write(f"Checking {total} in-place imported files…")
missing = []
for i, row in enumerate(candidates.values("id", "source").iterator()):
path = row["source"].replace("file://", "")
progress(self.stdout, i + 1, total)
if not os.path.exists(path):
missing.append((path, row["id"]))
if missing:
for path, _ in missing:
self.stdout.write(f" {path}")
self.stdout.write(
"The previous {} paths are referenced in database, but not found on disk!".format(
len(missing)
)
)
else:
self.stdout.write("All in-place imports have a matching on-disk file")
return
to_delete = candidates.filter(pk__in=[id for _, id in missing])
if options["dry_run"]:
self.stdout.write(
"Nothing was deleted, rerun this command with --no-dry-run to apply the changes"
)
else:
self.stdout.write(f"Deleting {to_delete.count()} uploads…")
to_delete.delete()
import requests.exceptions
from django.core.management.base import BaseCommand, CommandError
from funkwhale_api.music import licenses
class Command(BaseCommand):
help = "Check that specified licenses URLs are actually reachable"
def handle(self, *args, **options):
errored = []
objs = licenses.LICENSES
total = len(objs)
for i, data in enumerate(objs):
self.stderr.write("{}/{} Checking {}...".format(i + 1, total, data["code"]))
response = requests.get(data["url"])
try:
response.raise_for_status()
except requests.exceptions.RequestException:
self.stderr.write("!!! Error while fetching {}!".format(data["code"]))
errored.append((data, response))
if errored:
self.stdout.write(f"{len(errored)} licenses were not reachable!")
for row, response in errored:
self.stdout.write(
"- {}: error {} at url {}".format(
row["code"], response.status_code, row["url"]
)
)
raise CommandError()
else:
self.stdout.write("All licenses are valid and reachable :)")
from django.core.management.base import BaseCommand
from funkwhale_api.federation.models import Actor
from funkwhale_api.music.models import Library
class Command(BaseCommand):
help = """
Create a new library for a given user.
"""
def add_arguments(self, parser):
parser.add_argument(
"username",
type=str,
help=("Specify the owner of the library to be created."),
)
parser.add_argument(
"--name",
type=str,
help=("Specify a name for the library."),
default="default",
)
parser.add_argument(
"--privacy-level",
type=str.lower,
choices=["me", "instance", "everyone"],
help=("Specify the privacy level for the library."),
default="me",
)
def handle(self, *args, **kwargs):
actor, actor_created = Actor.objects.get_or_create(name=kwargs["username"])
if actor_created:
self.stdout.write("No existing actor found. New actor created.")
library, created = Library.objects.get_or_create(
name=kwargs["name"], actor=actor, privacy_level=kwargs["privacy_level"]
)
if created:
self.stdout.write(
"Created library {} for user {} with UUID {}".format(
library.pk, actor.user.pk, library.uuid
)
)
else:
self.stdout.write(
"Found existing library {} for user {} with UUID {}".format(
library.pk, actor.user.pk, library.uuid
)
)
import os
import mutagen
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Q
from funkwhale_api.music import models, utils
from funkwhale_api.playlists import models as playlist_models
from funkwhale_api.users import models as user_models
def get_or_create_playlist(self, playlist_name, user, **options):
playlist = playlist_models.Playlist.objects.filter(
Q(actor=user.actor) & Q(name=playlist_name)
).first()
if not playlist:
if options["no_dry_run"]:
playlist = playlist_models.Playlist.objects.create(
name=playlist_name,
actor=user.actor,
privacy_level=options["privacy_level"],
)
return playlist
response = input(
f"This playlist {playlist_name} will be created. Proceed? (y/n): "
)
if response.lower() in "yes":
playlist = playlist_models.Playlist.objects.create(
name=playlist_name,
actor=user.actor,
privacy_level=options["privacy_level"],
)
return playlist
else:
return playlist
def get_fw_track_list(self, directory, playlist, **options):
fw_tracks = []
audio_extensions = utils.SUPPORTED_EXTENSIONS
existing_tracks = playlist.playlist_tracks.select_for_update()
for file in next(os.walk(directory))[2]:
if file.endswith(tuple(audio_extensions)):
track_path = os.path.join(directory, file)
try:
audio = mutagen.File(track_path)
except mutagen.MutagenError as e:
self.stdout.write(
f"Could not load {track_path} because of a mutagen exception : {e}"
)
if options["only_mbid"]:
mbid = (
audio.get("UFID:http://musicbrainz.org", None).data.decode()
if audio.get("UFID:http://musicbrainz.org", None)
else None
)
if not mbid:
self.stdout.write(
f"Did not find mbid, skipping track {track_path}..."
)
continue
try:
track_fw = models.Track.objects.get(mbid=mbid)
except models.Track.DoesNotExist:
self.stdout.write(f"No track found for {track_path}")
continue
else:
try:
self.stdout.write(f"rack_path {str(track_path)}...")
track_fw = models.Upload.objects.get(source=track_path)
except models.Upload.DoesNotExist:
self.stdout.write(f"No track found for {track_path}")
continue
if existing_tracks.filter(track__id=track_fw.id).exists():
self.stdout.write(
f"Track already in playlist. Skipping {track_path}..."
)
continue
fw_tracks.append(track_fw)
return fw_tracks
def add_tracks_to_playlist(self, directory, user, **options):
playlist_name = os.path.basename(directory)
playlist = get_or_create_playlist(self, playlist_name, user, **options)
fw_track_list = get_fw_track_list(self, directory, playlist, **options)
if options["no_dry_run"] is True:
return playlist.insert_many(fw_track_list, allow_duplicates=False)
response = input(
f"These tracks {fw_track_list} will be added to playlist {playlist_name}. Proceed? (y/n): "
)
if response.lower() in "yes":
return playlist.insert_many(fw_track_list, allow_duplicates=False)
class Command(BaseCommand):
help = """
This command creates playlists based on a folder structure. It uses the base folder
of each track as the playlist name. Subdirectories are taken into account but generate independent
playlists. Tracks contained in subdirectories don't appear in the parent directory playlist.
You will be asked to confirm the action before the playlist is created. Duplicate content in the
playlist isn't supported.
"""
def add_arguments(self, parser):
parser.add_argument(
"--user_name",
help="User name that will own the playlists",
)
parser.add_argument(
"--dir_name",
help="Which directory to start from.",
)
parser.add_argument(
"--privacy_level",
default="me",
choices=["me", "instance", "everyone"],
help="Which privacy_level for the playlists.",
)
parser.add_argument(
"--no_dry_run",
default=False,
help="Will actually write data into the database",
)
parser.add_argument(
"--only_mbid",
default=False,
help='Only files tagged with mbid will be used. Can be useful to create playlist from folders \
that are not "in-place" imported into funkwhale',
)
@transaction.atomic
def handle(self, *args, **options):
all_subdirectories = []
for root, dirs, files in os.walk(options["dir_name"]):
for dir_name in dirs:
full_dir_path = os.path.join(root, dir_name)
all_subdirectories.append(full_dir_path)
user = user_models.User.objects.get(username=options["user_name"])
for directory in all_subdirectories:
add_tracks_to_playlist(self, directory, user, **options)
import cacheops
from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Q
from funkwhale_api.music import models, utils
class Command(BaseCommand):
help = "Run common checks and fix against imported tracks"
def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
dest="dry_run",
default=False,
help="Do not execute anything",
)
def handle(self, *args, **options):
if options["dry_run"]:
self.stdout.write("Dry-run on, will not commit anything")
self.fix_mimetypes(**options)
self.fix_file_data(**options)
self.fix_file_size(**options)
cacheops.invalidate_model(models.TrackFile)
@transaction.atomic
def fix_mimetypes(self, dry_run, **kwargs):
self.stdout.write("Fixing missing mimetypes...")
matching = models.TrackFile.objects.filter(
source__startswith="file://"
).exclude(mimetype__startswith="audio/")
self.stdout.write(
"[mimetypes] {} entries found with bad or no mimetype".format(
matching.count()
)
)
for extension, mimetype in utils.EXTENSION_TO_MIMETYPE.items():
qs = matching.filter(source__endswith=".{}".format(extension))
self.stdout.write(
"[mimetypes] setting {} {} files to {}".format(
qs.count(), extension, mimetype
)
)
if not dry_run:
self.stdout.write("[mimetypes] commiting...")
qs.update(mimetype=mimetype)
def fix_file_data(self, dry_run, **kwargs):
self.stdout.write("Fixing missing bitrate or length...")
matching = models.TrackFile.objects.filter(
Q(bitrate__isnull=True) | Q(duration__isnull=True)
)
total = matching.count()
self.stdout.write(
"[bitrate/length] {} entries found with missing values".format(total)
)
if dry_run:
return
for i, tf in enumerate(matching.only("audio_file")):
self.stdout.write(
"[bitrate/length] {}/{} fixing file #{}".format(i + 1, total, tf.pk)
)
try:
audio_file = tf.get_audio_file()
if audio_file:
data = utils.get_audio_file_data(audio_file)
tf.bitrate = data["bitrate"]
tf.duration = data["length"]
tf.save(update_fields=["duration", "bitrate"])
else:
self.stderr.write("[bitrate/length] no file found")
except Exception as e:
self.stderr.write(
"[bitrate/length] error with file #{}: {}".format(tf.pk, str(e))
)
def fix_file_size(self, dry_run, **kwargs):
self.stdout.write("Fixing missing size...")
matching = models.TrackFile.objects.filter(size__isnull=True)
total = matching.count()
self.stdout.write("[size] {} entries found with missing values".format(total))
if dry_run:
return
for i, tf in enumerate(matching.only("size")):
self.stdout.write(
"[size] {}/{} fixing file #{}".format(i + 1, total, tf.pk)
)
try:
tf.size = tf.get_file_size()
tf.save(update_fields=["size"])
except Exception as e:
self.stderr.write(
"[size] error with file #{}: {}".format(tf.pk, str(e))
)