You need to sign in or sign up before continuing.
Newer
Older
to_attr="_follows",
)
)
def viewable_by(self, actor):
from funkwhale_api.federation.models import LibraryFollow
if actor is None:
return Library.objects.filter(privacy_level="everyone")
me_query = models.Q(privacy_level="me", actor=actor)
instance_query = models.Q(privacy_level="instance", actor__domain=actor.domain)
followed_libraries = LibraryFollow.objects.filter(
actor=actor, approved=True
).values_list("target", flat=True)
return Library.objects.filter(
me_query
| instance_query
| models.Q(privacy_level="everyone")
| models.Q(pk__in=followed_libraries)
)
class Library(federation_models.FederationMixin):
uuid = models.UUIDField(unique=True, db_index=True, default=uuid.uuid4)
actor = models.ForeignKey(
"federation.Actor", related_name="libraries", on_delete=models.CASCADE
)
followers_url = models.URLField(max_length=500)
creation_date = models.DateTimeField(default=timezone.now)
name = models.CharField(max_length=100)
description = models.TextField(max_length=5000, null=True, blank=True)
privacy_level = models.CharField(
choices=LIBRARY_PRIVACY_LEVEL_CHOICES, default="me", max_length=25
)
uploads_count = models.PositiveIntegerField(default=0)
objects = LibraryQuerySet.as_manager()
def get_federation_id(self):
return federation_utils.full_url(
reverse("federation:music:libraries-detail", kwargs={"uuid": self.uuid})
)
def save(self, **kwargs):
if not self.pk and not self.fid and self.actor.get_user():
self.fid = self.get_federation_id()
self.followers_url = self.fid + "/followers"
return super().save(**kwargs)
def should_autoapprove_follow(self, actor):
if self.privacy_level == "everyone":
return True
if self.privacy_level == "instance" and actor.get_user():
return True
return False
def schedule_scan(self, actor, force=False):
latest_scan = (
self.scans.exclude(status="errored").order_by("-creation_date").first()
)
delay_between_scans = datetime.timedelta(seconds=3600 * 24)
now = timezone.now()
if (
not force
and latest_scan
and latest_scan.creation_date + delay_between_scans > now
):
scan = self.scans.create(total_files=self.uploads_count, actor=actor)
from . import tasks
common_utils.on_commit(tasks.start_library_scan.delay, library_scan_id=scan.pk)
return scan
SCAN_STATUS = [
("pending", "pending"),
("scanning", "scanning"),
("finished", "finished"),
]
class LibraryScan(models.Model):
actor = models.ForeignKey(
"federation.Actor", null=True, blank=True, on_delete=models.CASCADE
)
library = models.ForeignKey(Library, related_name="scans", on_delete=models.CASCADE)
total_files = models.PositiveIntegerField(default=0)
processed_files = models.PositiveIntegerField(default=0)
errored_files = models.PositiveIntegerField(default=0)
status = models.CharField(default="pending", max_length=25)
creation_date = models.DateTimeField(default=timezone.now)
modification_date = models.DateTimeField(null=True, blank=True)
@receiver(post_save, sender=ImportJob)
def update_batch_status(sender, instance, **kwargs):
instance.batch.update_status()
@receiver(post_save, sender=ImportBatch)
def update_request_status(sender, instance, created, **kwargs):
if not instance.import_request:
return
if not created and "status" not in update_fields:
return
r_status = instance.import_request.status
status = instance.status
# let's mark the request as accepted since we started an import
instance.import_request.status = "accepted"
return instance.import_request.save(update_fields=["status"])
# let's mark the request as imported since the import is over
instance.import_request.status = "imported"
return instance.import_request.save(update_fields=["status"])
@receiver(models.signals.post_save, sender=Album)
def warm_album_covers(sender, instance, **kwargs):
Eliot Berriot
committed
if not instance.cover or not settings.CREATE_IMAGE_THUMBNAILS:
return
album_covers_warmer = VersatileImageFieldWarmer(
instance_or_queryset=instance, rendition_key_set="square", image_attr="cover"
)
num_created, failed_to_create = album_covers_warmer.warm()