Skip to content
Snippets Groups Projects
Select Git revision
  • LDAPGroupTypeSupport
  • develop default protected
  • master
  • 876-http-signature
  • plugins-v2
  • plugins
  • 0.21
  • 0.21-rc2
  • 0.21-rc1
  • 0.20.1
  • 0.20.0
  • 0.20.0-rc1
  • 0.19.1
  • 0.19.0
  • 0.19.0-rc2
  • 0.19.0-rc1
  • 0.18.3
  • 0.18.2
  • 0.18.1
  • 0.18
  • 0.17
  • 0.16.3
  • 0.16.2
  • 0.16.1
  • 0.16
  • 0.15
26 results

authentication.py

Blame
  • Forked from funkwhale / funkwhale
    5413 commits behind the upstream repository.
    authentication.py 2.93 KiB
    import cryptography
    import logging
    import datetime
    
    from django.contrib.auth.models import AnonymousUser
    from django.utils import timezone
    
    from rest_framework import authentication, exceptions as rest_exceptions
    from funkwhale_api.moderation import models as moderation_models
    from . import actors, exceptions, keys, signing, tasks, utils
    
    
    logger = logging.getLogger(__name__)
    
    
    class SignatureAuthentication(authentication.BaseAuthentication):
        def authenticate_actor(self, request):
            headers = utils.clean_wsgi_headers(request.META)
            try:
                signature = headers["Signature"]
                key_id = keys.get_key_id_from_signature_header(signature)
            except KeyError:
                return
            except ValueError as e:
                raise rest_exceptions.AuthenticationFailed(str(e))
    
            try:
                actor_url = key_id.split("#")[0]
            except (TypeError, IndexError, AttributeError):
                raise rest_exceptions.AuthenticationFailed("Invalid key id")
    
            policies = (
                moderation_models.InstancePolicy.objects.active()
                .filter(block_all=True)
                .matching_url(actor_url)
            )
            if policies.exists():
                raise exceptions.BlockedActorOrDomain()
    
            try:
                actor = actors.get_actor(actor_url)
            except Exception as e:
                logger.info(
                    "Discarding HTTP request from blocked actor/domain %s", actor_url
                )
                raise rest_exceptions.AuthenticationFailed(str(e))
    
            if not actor.public_key:
                raise rest_exceptions.AuthenticationFailed("No public key found")
    
            try:
                signing.verify_django(request, actor.public_key.encode("utf-8"))
            except cryptography.exceptions.InvalidSignature:
                # in case of invalid signature, we refetch the actor object
                # to load a potentially new public key. This process is called
                # Blind key rotation, and is described at
                # https://blog.dereferenced.org/the-case-for-blind-key-rotation
                # if signature verification fails after that, then we return a 403 error
                actor = actors.get_actor(actor_url, skip_cache=True)
                signing.verify_django(request, actor.public_key.encode("utf-8"))
    
            # we trigger a nodeinfo update on the actor's domain, if needed
            fetch_delay = 24 * 3600
            now = timezone.now()
            last_fetch = actor.domain.nodeinfo_fetch_date
            if not last_fetch or (
                last_fetch < (now - datetime.timedelta(seconds=fetch_delay))
            ):
                tasks.update_domain_nodeinfo(domain_name=actor.domain.name)
                actor.domain.refresh_from_db()
            return actor
    
        def authenticate(self, request):
            setattr(request, "actor", None)
            actor = self.authenticate_actor(request)
            if not actor:
                return
            user = AnonymousUser()
            setattr(request, "actor", actor)
            return (user, None)