Skip to content
Snippets Groups Projects
Unverified Commit 0d89ee66 authored by Georg Krause's avatar Georg Krause
Browse files

Format the codebase using black

parent 14153c08
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ cryptography = ">=1.8.2"
requests = "^2"
[tool.poetry.dev-dependencies]
Sphinx = "^4.5.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
......
......@@ -7,13 +7,18 @@ import requests
from requests.compat import urlparse
from requests.exceptions import RequestException
class RequestsHttpSignatureException(RequestException):
"""An error occurred while constructing the HTTP Signature for your request."""
class Crypto:
def __init__(self, algorithm):
if algorithm != "hmac-sha256":
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
load_pem_public_key,
)
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
......@@ -23,19 +28,31 @@ class Crypto:
def sign(self, string_to_sign, key, passphrase=None):
if self.algorithm == "hmac-sha256":
return hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest()
key = self.load_pem_private_key(key, password=passphrase, backend=self.default_backend())
key = self.load_pem_private_key(
key, password=passphrase, backend=self.default_backend()
)
if self.algorithm in {"rsa-sha1", "rsa-sha256"}:
hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256()
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
return key.sign(
padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign
)
elif self.algorithm in {"rsa-sha512"}:
hasher = self.SHA512()
return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign)
return key.sign(
padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign
)
elif self.algorithm == "ecdsa-sha256":
return key.sign(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()), data=string_to_sign)
return key.sign(
signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()),
data=string_to_sign,
)
def verify(self, signature, string_to_sign, key):
if self.algorithm == "hmac-sha256":
assert signature == hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest()
assert (
signature
== hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest()
)
else:
key = self.load_pem_public_key(key, backend=self.default_backend())
hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256()
......@@ -44,6 +61,7 @@ class Crypto:
else:
key.verify(signature, string_to_sign, self.PKCS1v15(), hasher)
class HTTPSignatureAuth(requests.auth.AuthBase):
hasher_constructor = hashlib.sha256
known_algorithms = {
......@@ -54,7 +72,15 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
"ecdsa-sha256",
}
def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None):
def __init__(
self,
key,
key_id,
algorithm="hmac-sha256",
headers=None,
passphrase=None,
expires_in=None,
):
"""
:param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key
:param datetime.timedelta expires_in: The time after which this signature should expire
......@@ -64,7 +90,11 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
self.key_id = key_id
self.algorithm = algorithm
self.headers = [h.lower() for h in headers] if headers is not None else ["date"]
self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode()
self.passphrase = (
passphrase
if passphrase is None or isinstance(passphrase, bytes)
else passphrase.encode()
)
self.expires_in = expires_in
def add_date(self, request, timestamp):
......@@ -73,7 +103,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
def add_digest(self, request):
if request.body is None and "digest" in self.headers:
raise RequestsHttpSignatureException("Could not compute digest header for request without a body")
raise RequestsHttpSignatureException(
"Could not compute digest header for request without a body"
)
if request.body is not None and "Digest" not in request.headers:
if "digest" not in self.headers:
self.headers.append("digest")
......@@ -81,7 +113,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode()
@classmethod
def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp):
def get_string_to_sign(
self, request, headers, created_timestamp, expires_timestamp
):
sts = []
for header in headers:
if header == "(request-target)":
......@@ -90,15 +124,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
elif header == "(created)" and created_timestamp:
sts.append("{}: {}".format(header, created_timestamp))
elif header == "(expires)":
assert (expires_timestamp is not None), \
'You should provide the "expires_in" argument when using the (expires) header'
assert (
expires_timestamp is not None
), 'You should provide the "expires_in" argument when using the (expires) header'
sts.append("{}: {}".format(header, int(expires_timestamp)))
else:
if header.lower() == "host":
url = urlparse(request.url)
value = request.headers.get("host", url.hostname)
if url.scheme == "http" and url.port not in [None, 80] or url.scheme == "https" \
and url.port not in [443, None]:
if (
url.scheme == "http"
and url.port not in [None, 80]
or url.scheme == "https"
and url.port not in [443, None]
):
value = "{}:{}".format(value, url.port)
else:
value = request.headers[header]
......@@ -113,7 +152,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
self.add_date(request, created_timestamp)
self.add_digest(request)
raw_sig = Crypto(self.algorithm).sign(
string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp),
string_to_sign=self.get_string_to_sign(
request, self.headers, created_timestamp, expires_timestamp
),
key=self.key.encode() if isinstance(self.key, str) else self.key,
passphrase=self.passphrase,
)
......@@ -124,15 +165,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
("headers", " ".join(self.headers)),
("signature", sig),
]
if not (self.algorithm.startswith("rsa") or self.algorithm.startswith("hmac") or
self.algorithm.startswith("ecdsa")):
if not (
self.algorithm.startswith("rsa")
or self.algorithm.startswith("hmac")
or self.algorithm.startswith("ecdsa")
):
sig_struct.append(("created", int(created_timestamp)))
if expires_timestamp is not None:
sig_struct.append(("expires", int(expires_timestamp)))
return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct)
def __call__(self, request):
request.headers["Authorization"] = "Signature " + self.create_signature_string(request)
request.headers["Authorization"] = "Signature " + self.create_signature_string(
request
)
return request
@classmethod
......@@ -140,31 +186,48 @@ class HTTPSignatureAuth(requests.auth.AuthBase):
sig_struct = request.headers[scheme]
if scheme == "Authorization":
sig_struct = sig_struct.split(" ", 1)[1]
return {i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') for i in sig_struct.split(",")}
return {
i.split("=", 1)[0]: i.split("=", 1)[1].strip('"')
for i in sig_struct.split(",")
}
@classmethod
def verify(self, request, key_resolver, scheme="Authorization"):
if scheme == "Authorization":
assert "Authorization" in request.headers, "No Authorization header found"
msg = 'Unexpected scheme found in Authorization header (expected "Signature")'
msg = (
'Unexpected scheme found in Authorization header (expected "Signature")'
)
assert request.headers["Authorization"].startswith("Signature "), msg
elif scheme == "Signature":
assert "Signature" in request.headers, "No Signature header found"
else:
raise RequestsHttpSignatureException('Unknown signature scheme "{}"'.format(scheme))
raise RequestsHttpSignatureException(
'Unknown signature scheme "{}"'.format(scheme)
)
sig_struct = self.get_sig_struct(request, scheme=scheme)
for field in "keyId", "algorithm", "signature":
assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field)
assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm"
created_timestamp = int(sig_struct['created']) if 'created' in sig_struct else None
expires_timestamp = sig_struct.get('expires')
assert (
field in sig_struct
), 'Required signature parameter "{}" not found'.format(field)
assert (
sig_struct["algorithm"] in self.known_algorithms
), "Unknown signature algorithm"
created_timestamp = (
int(sig_struct["created"]) if "created" in sig_struct else None
)
expires_timestamp = sig_struct.get("expires")
if expires_timestamp is not None:
expires_timestamp = int(expires_timestamp)
headers = sig_struct.get("headers", "date").split(" ")
sig = base64.b64decode(sig_struct["signature"])
sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp)
key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"])
sts = self.get_string_to_sign(
request, headers, created_timestamp, expires_timestamp=expires_timestamp
)
key = key_resolver(
key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"]
)
Crypto(sig_struct["algorithm"]).verify(sig, sts, key)
if expires_timestamp is not None:
assert expires_timestamp > int(time.time())
......
......@@ -9,9 +9,16 @@ import requests
from cryptography.fernet import Fernet
from requests.adapters import HTTPAdapter
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # noqa
sys.path.insert(
0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
) # noqa
from requests_http_signature import (
HTTPSignatureAuth,
HTTPSignatureHeaderAuth,
RequestsHttpSignatureException,
)
from requests_http_signature import HTTPSignatureAuth, HTTPSignatureHeaderAuth, RequestsHttpSignatureException
hmac_secret = b"monorail_cat"
passphrase = b"passw0rd"
......@@ -26,12 +33,17 @@ class TestAdapter(HTTPAdapter):
if "pubkey" in request.headers:
return base64.b64decode(request.headers["pubkey"])
return hmac_secret
HTTPSignatureAuth.verify(request,
HTTPSignatureAuth.verify(
request,
key_resolver=key_resolver,
scheme=request.headers.get("sigScheme", "Authorization"))
scheme=request.headers.get("sigScheme", "Authorization"),
)
if "expectSig" in request.headers:
self.testcase.assertEqual(request.headers["expectSig"],
HTTPSignatureAuth.get_sig_struct(request)["signature"])
self.testcase.assertEqual(
request.headers["expectSig"],
HTTPSignatureAuth.get_sig_struct(request)["signature"],
)
response = requests.Response()
response.status_code = requests.codes.ok
response.url = request.url
......@@ -50,34 +62,45 @@ class TestRequestsHTTPSignature(unittest.TestCase):
self.session.mount("http://", TestAdapter(self))
def test_readme_example(self):
preshared_key_id = 'squirrel'
preshared_secret = 'monorail_cat'
url = 'http://example.com/path'
requests.get(url, auth=HTTPSignatureAuth(key=preshared_secret, key_id=preshared_key_id))
preshared_key_id = "squirrel"
preshared_secret = "monorail_cat"
url = "http://example.com/path"
requests.get(
url, auth=HTTPSignatureAuth(key=preshared_secret, key_id=preshared_key_id)
)
def test_basic_statements(self):
url = 'http://example.com/path?query#fragment'
url = "http://example.com/path?query#fragment"
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret, key_id="sekret"))
with self.assertRaises(AssertionError):
self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret"))
with self.assertRaisesRegex(RequestsHttpSignatureException,
"Could not compute digest header for request without a body"):
self.session.get(url,
auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret", headers=["date", "digest"]))
self.session.get(
url, auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret")
)
with self.assertRaisesRegex(
RequestsHttpSignatureException,
"Could not compute digest header for request without a body",
):
self.session.get(
url,
auth=HTTPSignatureAuth(
key=hmac_secret[::-1], key_id="sekret", headers=["date", "digest"]
),
)
def test_expired_signature(self):
with self.assertRaises(AssertionError):
preshared_key_id = 'squirrel'
preshared_key_id = "squirrel"
key = Fernet.generate_key()
one_month = timedelta(days=-30)
headers = ["(expires)"]
auth = HTTPSignatureAuth(key=key, key_id=preshared_key_id,
expires_in=one_month, headers=headers)
auth = HTTPSignatureAuth(
key=key, key_id=preshared_key_id, expires_in=one_month, headers=headers
)
def key_resolver(key_id, algorithm):
return key
url = 'http://example.com/path'
url = "http://example.com/path"
response = requests.get(url, auth=auth)
HTTPSignatureAuth.verify(response.request, key_resolver=key_resolver)
......@@ -87,12 +110,14 @@ class TestRequestsHTTPSignature(unittest.TestCase):
# Also, the values in https://github.com/joyent/node-http-signature/blob/master/test/verify.test.js don't match
# up with ours. This is because node-http-signature seems to compute the content-length incorrectly in its test
# suite (it should be 18, but they use 17).
url = 'http://example.org/foo'
url = "http://example.org/foo"
payload = {"hello": "world"}
date = "Thu, 05 Jan 2012 21:31:40 GMT"
auth = HTTPSignatureAuth(key=hmac_secret,
auth = HTTPSignatureAuth(
key=hmac_secret,
key_id="sekret",
headers=["(request-target)", "host", "date", "digest", "content-length"])
headers=["(request-target)", "host", "date", "digest", "content-length"],
)
self.session.post(url, json=payload, headers={"Date": date}, auth=auth)
pubkey_fn = os.path.join(os.path.dirname(__file__), "pubkey.pem")
......@@ -101,46 +126,70 @@ class TestRequestsHTTPSignature(unittest.TestCase):
with open(pubkey_fn, "rb") as pubkey, open(privkey_fn, "rb") as privkey:
pubkey_b64 = base64.b64encode(pubkey.read())
auth = DigestlessSignatureAuth(algorithm="rsa-sha256", key=privkey.read(), key_id="Test")
auth = DigestlessSignatureAuth(
algorithm="rsa-sha256", key=privkey.read(), key_id="Test"
)
expect_sig = "ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=" # noqa
headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig}
self.session.post(url, json=payload, headers=headers, auth=auth)
with open(pubkey_fn, "rb") as pubkey, open(privkey_fn, "rb") as privkey:
pubkey_b64 = base64.b64encode(pubkey.read())
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=privkey.read(), key_id="Test",
headers="(request-target) host date content-type digest content-length".split())
auth = HTTPSignatureAuth(
algorithm="rsa-sha256",
key=privkey.read(),
key_id="Test",
headers="(request-target) host date content-type digest content-length".split(),
)
expect_sig = "DkOOyDlO9rXmOiU+k6L86N4UFEcey2YD+/Bz8c+Sr6XVDtDCxUuFEHMO+Atag/V1iLu+3KczVrCwjaZ39Ox3RufJghHzhTffyEkfPI6Ivf271mfRU9+wLxuGj9f+ATVO14nvcZyQjAMLvV7qh35zQcYdeD5XyxLLjuYUnK14rYI=" # noqa
headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig, "content-type": "application/json"}
headers = {
"Date": date,
"pubkey": pubkey_b64,
"expectSig": expect_sig,
"content-type": "application/json",
}
self.session.post(url, json=payload, headers=headers, auth=auth)
auth = HTTPSignatureHeaderAuth(key=hmac_secret,
auth = HTTPSignatureHeaderAuth(
key=hmac_secret,
key_id="sekret",
headers=["(request-target)", "host", "date", "digest", "content-length"])
self.session.post(url, json=payload, headers={"Date": date, "sigScheme": "Signature"}, auth=auth)
headers=["(request-target)", "host", "date", "digest", "content-length"],
)
self.session.post(
url,
json=payload,
headers={"Date": date, "sigScheme": "Signature"},
auth=auth,
)
def test_rsa(self):
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
public_exponent=65537, key_size=2048, backend=default_backend()
)
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passphrase)
encryption_algorithm=serialization.BestAvailableEncryption(passphrase),
)
public_key_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
url = "http://example.com/path?query#fragment"
auth = HTTPSignatureAuth(
algorithm="rsa-sha256",
key=private_key_pem,
key_id="sekret",
passphrase=passphrase,
)
self.session.get(
url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem))
)
url = 'http://example.com/path?query#fragment'
auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase)
self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)))
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment