Verified Commit 78d0de0e authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Merge branch 'release/0.8'

parents 3673f624 1a5b7ed2
import logging
import requests
import xml
from django.conf import settings
from django.urls import reverse
from django.utils import timezone
from rest_framework.exceptions import PermissionDenied
from dynamic_preferences.registries import global_preferences_registry
from . import activity
from . import models
from . import serializers
from . import utils
logger = logging.getLogger(__name__)
def remove_tags(text):
logger.debug('Removing tags from %s', text)
return ''.join(xml.etree.ElementTree.fromstring('<div>{}</div>'.format(text)).itertext())
def get_actor_data(actor_url):
response = requests.get(
actor_url,
headers={
'Accept': 'application/activity+json',
}
)
response.raise_for_status()
try:
return response.json()
except:
raise ValueError(
'Invalid actor payload: {}'.format(response.text))
def get_actor(actor_url):
data = get_actor_data(actor_url)
serializer = serializers.ActorSerializer(data=data)
serializer.is_valid(raise_exception=True)
return serializer.build()
class SystemActor(object):
additional_attributes = {}
def get_actor_instance(self):
a = models.Actor(
**self.get_instance_argument(
self.id,
name=self.name,
summary=self.summary,
**self.additional_attributes
)
)
a.pk = self.id
return a
def get_instance_argument(self, id, name, summary, **kwargs):
preferences = global_preferences_registry.manager()
p = {
'preferred_username': id,
'domain': settings.FEDERATION_HOSTNAME,
'type': 'Person',
'name': name.format(host=settings.FEDERATION_HOSTNAME),
'manually_approves_followers': True,
'url': utils.full_url(
reverse(
'federation:instance-actors-detail',
kwargs={'actor': id})),
'shared_inbox_url': utils.full_url(
reverse(
'federation:instance-actors-inbox',
kwargs={'actor': id})),
'inbox_url': utils.full_url(
reverse(
'federation:instance-actors-inbox',
kwargs={'actor': id})),
'outbox_url': utils.full_url(
reverse(
'federation:instance-actors-outbox',
kwargs={'actor': id})),
'public_key': preferences['federation__public_key'],
'private_key': preferences['federation__private_key'],
'summary': summary.format(host=settings.FEDERATION_HOSTNAME)
}
p.update(kwargs)
return p
def get_inbox(self, data, actor=None):
raise NotImplementedError
def post_inbox(self, data, actor=None):
raise NotImplementedError
def get_outbox(self, data, actor=None):
raise NotImplementedError
def post_outbox(self, data, actor=None):
raise NotImplementedError
class LibraryActor(SystemActor):
id = 'library'
name = '{host}\'s library'
summary = 'Bot account to federate with {host}\'s library'
additional_attributes = {
'manually_approves_followers': True
}
class TestActor(SystemActor):
id = 'test'
name = '{host}\'s test account'
summary = (
'Bot account to test federation with {host}. '
'Send me /ping and I\'ll answer you.'
)
additional_attributes = {
'manually_approves_followers': False
}
def get_outbox(self, data, actor=None):
return {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
{}
],
"id": utils.full_url(
reverse(
'federation:instance-actors-outbox',
kwargs={'actor': self.id})),
"type": "OrderedCollection",
"totalItems": 0,
"orderedItems": []
}
def post_inbox(self, data, actor=None):
if actor is None:
raise PermissionDenied('Actor not authenticated')
serializer = serializers.ActivitySerializer(
data=data, context={'actor': actor})
serializer.is_valid(raise_exception=True)
ac = serializer.validated_data
logger.info('Received activity on %s inbox', self.id)
if ac['type'] == 'Create' and ac['object']['type'] == 'Note':
# we received a toot \o/
command = self.parse_command(ac['object']['content'])
logger.debug('Parsed command: %s', command)
if command == 'ping':
self.handle_ping(ac, actor)
def parse_command(self, message):
"""
Remove any links or fancy markup to extract /command from
a note message.
"""
raw = remove_tags(message)
try:
return raw.split('/')[1]
except IndexError:
return
def handle_ping(self, ac, sender):
now = timezone.now()
test_actor = self.get_actor_instance()
reply_url = 'https://{}/activities/note/{}'.format(
settings.FEDERATION_HOSTNAME, now.timestamp()
)
reply_content = '{} Pong!'.format(
sender.mention_username
)
reply_activity = {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
{}
],
'type': 'Create',
'actor': test_actor.url,
'id': '{}/activity'.format(reply_url),
'published': now.isoformat(),
'to': ac['actor'],
'cc': [],
'object': {
'type': 'Note',
'content': 'Pong!',
'summary': None,
'published': now.isoformat(),
'id': reply_url,
'inReplyTo': ac['object']['id'],
'sensitive': False,
'url': reply_url,
'to': [ac['actor']],
'attributedTo': test_actor.url,
'cc': [],
'attachment': [],
'tag': [{
"type": "Mention",
"href": ac['actor'],
"name": sender.mention_username
}]
}
}
activity.deliver(
reply_activity,
to=[ac['actor']],
on_behalf_of=test_actor)
SYSTEM_ACTORS = {
'library': LibraryActor(),
'test': TestActor(),
}
import cryptography
from django.contrib.auth.models import AnonymousUser
from rest_framework import authentication
from rest_framework import exceptions
from . import actors
from . import keys
from . import serializers
from . import signing
from . import utils
class SignatureAuthentication(authentication.BaseAuthentication):
def authenticate_actor(self, request):
headers = utils.clean_wsgi_headers(request.META)
try:
signature = headers['Signature']
key_id = keys.get_key_id_from_signature_header(signature)
except KeyError:
return
except ValueError as e:
raise exceptions.AuthenticationFailed(str(e))
try:
actor_data = actors.get_actor_data(key_id)
except Exception as e:
raise exceptions.AuthenticationFailed(str(e))
try:
public_key = actor_data['publicKey']['publicKeyPem']
except KeyError:
raise exceptions.AuthenticationFailed('No public key found')
serializer = serializers.ActorSerializer(data=actor_data)
if not serializer.is_valid():
raise exceptions.AuthenticationFailed('Invalid actor payload: {}'.format(serializer.errors))
try:
signing.verify_django(request, public_key.encode('utf-8'))
except cryptography.exceptions.InvalidSignature:
raise exceptions.AuthenticationFailed('Invalid signature')
return serializer.build()
def authenticate(self, request):
setattr(request, 'actor', None)
actor = self.authenticate_actor(request)
user = AnonymousUser()
setattr(request, 'actor', actor)
return (user, None)
from django.forms import widgets
from dynamic_preferences import types
from dynamic_preferences.registries import global_preferences_registry
federation = types.Section('federation')
@global_preferences_registry.register
class FederationPrivateKey(types.StringPreference):
show_in_api = False
section = federation
name = 'private_key'
default = ''
help_text = (
'Instance private key, used for signing federation HTTP requests'
)
verbose_name = (
'Instance private key (keep it secret, do not change it)'
)
@global_preferences_registry.register
class FederationPublicKey(types.StringPreference):
show_in_api = False
section = federation
name = 'public_key'
default = ''
help_text = (
'Instance public key, used for signing federation HTTP requests'
)
verbose_name = (
'Instance public key (do not change it)'
)
class MalformedPayload(ValueError):
pass
class MissingSignature(KeyError):
pass
import factory
import requests
import requests_http_signature
from django.utils import timezone
from funkwhale_api.factories import registry
from . import keys
from . import models
registry.register(keys.get_key_pair, name='federation.KeyPair')
@registry.register(name='federation.SignatureAuth')
class SignatureAuthFactory(factory.Factory):
algorithm = 'rsa-sha256'
key = factory.LazyFunction(lambda: keys.get_key_pair()[0])
key_id = factory.Faker('url')
use_auth_header = False
headers = [
'(request-target)',
'user-agent',
'host',
'date',
'content-type',]
class Meta:
model = requests_http_signature.HTTPSignatureAuth
@registry.register(name='federation.SignedRequest')
class SignedRequestFactory(factory.Factory):
url = factory.Faker('url')
method = 'get'
auth = factory.SubFactory(SignatureAuthFactory)
class Meta:
model = requests.Request
@factory.post_generation
def headers(self, create, extracted, **kwargs):
default_headers = {
'User-Agent': 'Test',
'Host': 'test.host',
'Date': 'Right now',
'Content-Type': 'application/activity+json'
}
if extracted:
default_headers.update(extracted)
self.headers.update(default_headers)
@registry.register
class ActorFactory(factory.DjangoModelFactory):
public_key = None
private_key = None
preferred_username = factory.Faker('user_name')
summary = factory.Faker('paragraph')
domain = factory.Faker('domain_name')
url = factory.LazyAttribute(lambda o: 'https://{}/users/{}'.format(o.domain, o.preferred_username))
inbox_url = factory.LazyAttribute(lambda o: 'https://{}/users/{}/inbox'.format(o.domain, o.preferred_username))
outbox_url = factory.LazyAttribute(lambda o: 'https://{}/users/{}/outbox'.format(o.domain, o.preferred_username))
class Meta:
model = models.Actor
@classmethod
def _generate(cls, create, attrs):
has_public = attrs.get('public_key') is not None
has_private = attrs.get('private_key') is not None
if not has_public and not has_private:
private, public = keys.get_key_pair()
attrs['private_key'] = private.decode('utf-8')
attrs['public_key'] = public.decode('utf-8')
return super()._generate(create, attrs)
@registry.register(name='federation.Note')
class NoteFactory(factory.Factory):
type = 'Note'
id = factory.Faker('url')
published = factory.LazyFunction(
lambda: timezone.now().isoformat()
)
inReplyTo = None
content = factory.Faker('sentence')
class Meta:
model = dict
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
import re
import requests
import urllib.parse
from . import exceptions
KEY_ID_REGEX = re.compile(r'keyId=\"(?P<id>.*)\"')
def get_key_pair(size=2048):
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=size
)
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption())
public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PublicFormat.PKCS1
)
return private_key, public_key
def get_key_id_from_signature_header(header_string):
parts = header_string.split(',')
try:
raw_key_id = [p for p in parts if p.startswith('keyId="')][0]
except IndexError:
raise ValueError('Missing key id')
match = KEY_ID_REGEX.match(raw_key_id)
if not match:
raise ValueError('Invalid key id')
key_id = match.groups()[0]
url = urllib.parse.urlparse(key_id)
if not url.scheme or not url.netloc:
raise ValueError('Invalid url')
if url.scheme not in ['http', 'https']:
raise ValueError('Invalid shceme')
return key_id
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from dynamic_preferences.registries import global_preferences_registry
from funkwhale_api.federation import keys
class Command(BaseCommand):
help = (
'Generate a public/private key pair for your instance,'
' for federation purposes. If a key pair already exists, does nothing.'
)
def add_arguments(self, parser):
parser.add_argument(
'--replace',
action='store_true',
dest='replace',
default=False,
help='Replace existing key pair, if any',
)
parser.add_argument(
'--noinput', '--no-input', action='store_false', dest='interactive',
help="Do NOT prompt the user for input of any kind.",
)
@transaction.atomic
def handle(self, *args, **options):
preferences = global_preferences_registry.manager()
existing_public = preferences['federation__public_key']
existing_private = preferences['federation__public_key']
if existing_public or existing_private and not options['replace']:
raise CommandError(
'Keys are already present! '
'Replace them with --replace if you know what you are doing.')
if options['interactive']:
message = (
'Are you sure you want to do this?\n\n'
"Type 'yes' to continue, or 'no' to cancel: "
)
if input(''.join(message)) != 'yes':
raise CommandError("Operation cancelled.")
private, public = keys.get_key_pair()
preferences['federation__public_key'] = public.decode('utf-8')
preferences['federation__private_key'] = private.decode('utf-8')
self.stdout.write(
'Your new key pair was generated.'
'Your public key is now:\n\n{}'.format(public.decode('utf-8'))
)
# Generated by Django 2.0.3 on 2018-03-31 13:43
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Actor',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('url', models.URLField(db_index=True, max_length=500, unique=True)),
('outbox_url', models.URLField(max_length=500)),
('inbox_url', models.URLField(max_length=500)),
('following_url', models.URLField(blank=True, max_length=500, null=True)),
('followers_url', models.URLField(blank=True, max_length=500, null=True)),
('shared_inbox_url', models.URLField(blank=True, max_length=500, null=True)),
('type', models.CharField(choices=[('Person', 'Person'), ('Application', 'Application'), ('Group', 'Group'), ('Organization', 'Organization'), ('Service', 'Service')], default='Person', max_length=25)),
('name', models.CharField(blank=True, max_length=200, null=True)),
('domain', models.CharField(max_length=1000)),
('summary', models.CharField(blank=True, max_length=500, null=True)),
('preferred_username', models.CharField(blank=True, max_length=200, null=True)),
('public_key', models.CharField(blank=True, max_length=5000, null=True)),
('private_key', models.CharField(blank=True, max_length=5000, null=True)),
('creation_date', models.DateTimeField(default=django.utils.timezone.now)),
('last_fetch_date', models.DateTimeField(default=django.utils.timezone.now)),
('manually_approves_followers', models.NullBooleanField(default=None)),
],
),
]
from django.conf import settings
from django.db import models
from django.utils import timezone
TYPE_CHOICES = [
('Person', 'Person'),
('Application', 'Application'),
('Group', 'Group'),
('Organization', 'Organization'),
('Service', 'Service'),
]
class Actor(models.Model):
url = models.URLField(unique=True, max_length=500, db_index=True)
outbox_url = models.URLField(max_length=500)
inbox_url = models.URLField(max_length=500)
following_url = models.URLField(max_length=500, null=True, blank=True)
followers_url = models.URLField(max_length=500, null=True, blank=True)
shared_inbox_url = models.URLField(max_length=500, null=True, blank=True)
type = models.CharField(
choices=TYPE_CHOICES, default='Person', max_length=25)
name = models.CharField(max_length=200, null=True, blank=True)
domain = models.CharField(max_length=1000)
summary = models.CharField(max_length=500, null=True, blank=True)
preferred_username = models.CharField(
max_length=200, null=True, blank=True)
public_key = models.CharField(max_length=5000, null=True, blank=True)