Skip to content
Snippets Groups Projects
authentication.py 4.93 KiB
Newer Older
from django.conf import settings
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _

from django.core.cache import cache

from allauth.account.utils import send_email_confirmation
from oauth2_provider.contrib.rest_framework.authentication import (
    OAuth2Authentication as BaseOAuth2Authentication,
)
from rest_framework import exceptions
from rest_framework_jwt import authentication
from rest_framework_jwt.settings import api_settings


def should_verify_email(user):
    if user.is_superuser:
        return False
    has_unverified_email = not user.has_verified_primary_email
    mandatory_verification = settings.ACCOUNT_EMAIL_VERIFICATION != "optional"
    return has_unverified_email and mandatory_verification


class UnverifiedEmail(Exception):
    def __init__(self, user):
        self.user = user


def resend_confirmation_email(request, user):
    THROTTLE_DELAY = 500
    cache_key = "auth:resent-email-confirmation:{}".format(user.pk)
    if cache.get(cache_key):
        return False

    done = send_email_confirmation(request, user)
    cache.set(cache_key, True, THROTTLE_DELAY)
    return done


class OAuth2Authentication(BaseOAuth2Authentication):
    def authenticate(self, request):
        try:
            return super().authenticate(request)
        except UnverifiedEmail as e:
            request.oauth2_error = {"error": "unverified_email"}
            resend_confirmation_email(request, e.user)


class BaseJsonWebTokenAuth(object):
    def authenticate(self, request):
        try:
            return super().authenticate(request)
        except UnverifiedEmail as e:
            msg = _("You need to verify your email address.")
            resend_confirmation_email(request, e.user)
            raise exceptions.AuthenticationFailed(msg)

    def authenticate_credentials(self, payload):
        """
        We have to implement this method by hand to ensure we can check that the
        User has a verified email, if required
        """
        User = authentication.get_user_model()
        username = authentication.jwt_get_username_from_payload(payload)

        if not username:
            msg = _("Invalid payload.")
            raise exceptions.AuthenticationFailed(msg)

        try:
            user = User.objects.get_by_natural_key(username)
        except User.DoesNotExist:
            msg = _("Invalid signature.")
            raise exceptions.AuthenticationFailed(msg)

        if not user.is_active:
            msg = _("User account is disabled.")
            raise exceptions.AuthenticationFailed(msg)

        if should_verify_email(user):

        return user


class JSONWebTokenAuthenticationQS(
    BaseJsonWebTokenAuth, authentication.BaseJSONWebTokenAuthentication
):
Eliot Berriot's avatar
Eliot Berriot committed
    www_authenticate_realm = "api"

    def get_jwt_value(self, request):
Eliot Berriot's avatar
Eliot Berriot committed
        token = request.query_params.get("jwt")
        if "jwt" in request.query_params and not token:
            msg = _("Invalid Authorization header. No credentials provided.")
            raise exceptions.AuthenticationFailed(msg)
        return token

    def authenticate_header(self, request):
        return '{0} realm="{1}"'.format(
Eliot Berriot's avatar
Eliot Berriot committed
            api_settings.JWT_AUTH_HEADER_PREFIX, self.www_authenticate_realm
        )
class BearerTokenHeaderAuth(
    BaseJsonWebTokenAuth, authentication.BaseJSONWebTokenAuthentication
):
    """
    For backward compatibility purpose, we used Authorization: JWT <token>
    but Authorization: Bearer <token> is probably better.
    """
Eliot Berriot's avatar
Eliot Berriot committed

    www_authenticate_realm = "api"

    def get_jwt_value(self, request):
        auth = authentication.get_authorization_header(request).split()
Eliot Berriot's avatar
Eliot Berriot committed
        auth_header_prefix = "bearer"

        if not auth:
            if api_settings.JWT_AUTH_COOKIE:
                return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
            return None

        if smart_text(auth[0].lower()) != auth_header_prefix:
            return None

        if len(auth) == 1:
Eliot Berriot's avatar
Eliot Berriot committed
            msg = _("Invalid Authorization header. No credentials provided.")
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
Eliot Berriot's avatar
Eliot Berriot committed
            msg = _(
                "Invalid Authorization header. Credentials string "
                "should not contain spaces."
            )
            raise exceptions.AuthenticationFailed(msg)

        return auth[1]

    def authenticate_header(self, request):
Eliot Berriot's avatar
Eliot Berriot committed
        return '{0} realm="{1}"'.format("Bearer", self.www_authenticate_realm)

    def authenticate(self, request):
        auth = super().authenticate(request)
        if auth:
            if not auth[0].actor:
                auth[0].create_actor()
        return auth


class JSONWebTokenAuthentication(
    BaseJsonWebTokenAuth, authentication.JSONWebTokenAuthentication
):
    def authenticate(self, request):
        auth = super().authenticate(request)

        if auth:
            if not auth[0].actor:
                auth[0].create_actor()
        return auth