Skip to content
Snippets Groups Projects
authentication.py 3.45 KiB
Newer Older
  • Learn to ignore specific revisions
  • import cryptography
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    import datetime
    
    from django.contrib.auth.models import AnonymousUser
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from django.utils import timezone
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    from rest_framework import authentication, exceptions as rest_exceptions
    
    from funkwhale_api.common import preferences
    
    from funkwhale_api.moderation import models as moderation_models
    
    from . import actors, exceptions, keys, models, signing, tasks, utils
    
    
    
    logger = logging.getLogger(__name__)
    
    
    
    class SignatureAuthentication(authentication.BaseAuthentication):
    
        def authenticate_actor(self, request):
            headers = utils.clean_wsgi_headers(request.META)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                signature = headers["Signature"]
    
                key_id = keys.get_key_id_from_signature_header(signature)
            except KeyError:
    
            except ValueError as e:
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                raise rest_exceptions.AuthenticationFailed(str(e))
    
                actor_url = key_id.split("#")[0]
            except (TypeError, IndexError, AttributeError):
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                raise rest_exceptions.AuthenticationFailed("Invalid key id")
    
    
            policies = (
                moderation_models.InstancePolicy.objects.active()
                .filter(block_all=True)
                .matching_url(actor_url)
            )
            if policies.exists():
                raise exceptions.BlockedActorOrDomain()
    
    
            if request.method.lower() == "get" and preferences.get(
                "moderation__allow_list_enabled"
            ):
                # Only GET requests because POST requests with messages will be handled through
                # MRF
                domain = urllib.parse.urlparse(actor_url).hostname
                allowed = models.Domain.objects.filter(name=domain, allowed=True).exists()
                if not allowed:
                    raise exceptions.BlockedActorOrDomain()
    
    
            try:
                actor = actors.get_actor(actor_url)
    
            except Exception as e:
    
                logger.info(
                    "Discarding HTTP request from blocked actor/domain %s", actor_url
                )
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                raise rest_exceptions.AuthenticationFailed(str(e))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                raise rest_exceptions.AuthenticationFailed("No public key found")
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                signing.verify_django(request, actor.public_key.encode("utf-8"))
    
            except cryptography.exceptions.InvalidSignature:
    
                # in case of invalid signature, we refetch the actor object
                # to load a potentially new public key. This process is called
                # Blind key rotation, and is described at
                # https://blog.dereferenced.org/the-case-for-blind-key-rotation
                # if signature verification fails after that, then we return a 403 error
                actor = actors.get_actor(actor_url, skip_cache=True)
                signing.verify_django(request, actor.public_key.encode("utf-8"))
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            # we trigger a nodeinfo update on the actor's domain, if needed
            fetch_delay = 24 * 3600
            now = timezone.now()
            last_fetch = actor.domain.nodeinfo_fetch_date
            if not last_fetch or (
                last_fetch < (now - datetime.timedelta(seconds=fetch_delay))
            ):
                tasks.update_domain_nodeinfo(domain_name=actor.domain.name)
                actor.domain.refresh_from_db()
    
    
        def authenticate(self, request):
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            setattr(request, "actor", None)
    
            actor = self.authenticate_actor(request)
    
            if not actor:
                return
    
            user = AnonymousUser()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
            setattr(request, "actor", actor)
    
            return (user, None)