Skip to content
Snippets Groups Projects
Verified Commit 5c2ddc56 authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Basic channels middleware for token auth

parent 498aa113
No related branches found
No related tags found
No related merge requests found
from django.conf.urls import url
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from funkwhale_api.common.auth import TokenAuthMiddleware
from funkwhale_api.music import consumers
application = ProtocolTypeRouter({
# Empty for now (http->django views is added by default)
"websocket": TokenAuthMiddleware(
URLRouter([
url("^api/v1/test/$", consumers.MyConsumer),
])
),
})
from urllib.parse import parse_qs
import jwt
from django.contrib.auth.models import AnonymousUser
from django.utils.encoding import smart_text
from rest_framework import exceptions
from rest_framework_jwt.settings import api_settings
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication
class TokenHeaderAuth(BaseJSONWebTokenAuthentication):
def get_jwt_value(self, request):
try:
qs = request.get('query_string', b'').decode('utf-8')
parsed = parse_qs(qs)
token = parsed['token'][0]
except KeyError:
raise exceptions.AuthenticationFailed('No token')
if not token:
raise exceptions.AuthenticationFailed('Empty token')
return token
class TokenAuthMiddleware:
"""
Custom middleware (insecure) that takes user IDs from the query string.
"""
def __init__(self, inner):
# Store the ASGI application we were passed
self.inner = inner
def __call__(self, scope):
auth = TokenHeaderAuth()
try:
user, token = auth.authenticate(scope)
except exceptions.AuthenticationFailed:
user = AnonymousUser()
scope['user'] = user
return self.inner(scope)
from channels.generic.websocket import JsonWebsocketConsumer
class JsonAuthConsumer(JsonWebsocketConsumer):
def connect(self):
try:
assert self.scope['user'].pk is not None
except (AssertionError, AttributeError, KeyError):
return self.close()
return self.accept()
import pytest
from rest_framework_jwt.settings import api_settings
from funkwhale_api.common.auth import TokenAuthMiddleware
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
@pytest.mark.parametrize('query_string', [
b'token=wrong',
b'',
])
def test_header_anonymous(query_string, factories):
def callback(scope):
assert scope['user'].is_anonymous
scope = {
'query_string': query_string
}
consumer = TokenAuthMiddleware(callback)
consumer(scope)
def test_header_correct_token(factories):
user = factories['users.User']()
payload = jwt_payload_handler(user)
token = jwt_encode_handler(payload)
def callback(scope):
assert scope['user'] == user
scope = {
'query_string': 'token={}'.format(token).encode('utf-8')
}
consumer = TokenAuthMiddleware(callback)
consumer(scope)
from funkwhale_api.common import consumers
def test_auth_consumer_requires_valid_user(mocker):
m = mocker.patch('funkwhale_api.common.consumers.JsonAuthConsumer.close')
scope = {'user': None}
consumer = consumers.JsonAuthConsumer(scope=scope)
consumer.connect()
m.assert_called_once_with()
def test_auth_consumer_requires_user_in_scope(mocker):
m = mocker.patch('funkwhale_api.common.consumers.JsonAuthConsumer.close')
scope = {}
consumer = consumers.JsonAuthConsumer(scope=scope)
consumer.connect()
m.assert_called_once_with()
def test_auth_consumer_accepts_connection(mocker, factories):
user = factories['users.User']()
m = mocker.patch('funkwhale_api.common.consumers.JsonAuthConsumer.accept')
scope = {'user': user}
consumer = consumers.JsonAuthConsumer(scope=scope)
consumer.connect()
m.assert_called_once_with()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment