diff --git a/pyproject.toml b/pyproject.toml index d50bf39cc088d50f271de72623ae94451e798175..a45e7b7e86e5e723b73e9163e4b7ca60b9ae2d07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ cryptography = ">=1.8.2" requests = "^2" [tool.poetry.dev-dependencies] +Sphinx = "^4.5.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/requests_http_message_signatures/__init__.py b/requests_http_message_signatures/__init__.py index db95b6d10ccdae6b497a92e270dcc636260ba3ae..767c9aa12bfaf964d85185388bf16411fa59d655 100644 --- a/requests_http_message_signatures/__init__.py +++ b/requests_http_message_signatures/__init__.py @@ -7,13 +7,18 @@ import requests from requests.compat import urlparse from requests.exceptions import RequestException + class RequestsHttpSignatureException(RequestException): """An error occurred while constructing the HTTP Signature for your request.""" + class Crypto: def __init__(self, algorithm): if algorithm != "hmac-sha256": - from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + load_pem_public_key, + ) from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa, ec from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 @@ -23,19 +28,31 @@ class Crypto: def sign(self, string_to_sign, key, passphrase=None): if self.algorithm == "hmac-sha256": return hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest() - key = self.load_pem_private_key(key, password=passphrase, backend=self.default_backend()) + key = self.load_pem_private_key( + key, password=passphrase, backend=self.default_backend() + ) if self.algorithm in {"rsa-sha1", "rsa-sha256"}: hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256() - return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign) + return key.sign( + padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign + ) elif self.algorithm in {"rsa-sha512"}: hasher = self.SHA512() - return key.sign(padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign) + return key.sign( + padding=self.PKCS1v15(), algorithm=hasher, data=string_to_sign + ) elif self.algorithm == "ecdsa-sha256": - return key.sign(signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()), data=string_to_sign) + return key.sign( + signature_algorithm=self.ec.ECDSA(algorithm=self.SHA256()), + data=string_to_sign, + ) def verify(self, signature, string_to_sign, key): if self.algorithm == "hmac-sha256": - assert signature == hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest() + assert ( + signature + == hmac.new(key, string_to_sign, digestmod=hashlib.sha256).digest() + ) else: key = self.load_pem_public_key(key, backend=self.default_backend()) hasher = self.SHA1() if self.algorithm.endswith("sha1") else self.SHA256() @@ -44,6 +61,7 @@ class Crypto: else: key.verify(signature, string_to_sign, self.PKCS1v15(), hasher) + class HTTPSignatureAuth(requests.auth.AuthBase): hasher_constructor = hashlib.sha256 known_algorithms = { @@ -54,7 +72,15 @@ class HTTPSignatureAuth(requests.auth.AuthBase): "ecdsa-sha256", } - def __init__(self, key, key_id, algorithm="hmac-sha256", headers=None, passphrase=None, expires_in=None): + def __init__( + self, + key, + key_id, + algorithm="hmac-sha256", + headers=None, + passphrase=None, + expires_in=None, + ): """ :param typing.Union[bytes, string] passphrase: The passphrase for an encrypted RSA private key :param datetime.timedelta expires_in: The time after which this signature should expire @@ -64,7 +90,11 @@ class HTTPSignatureAuth(requests.auth.AuthBase): self.key_id = key_id self.algorithm = algorithm self.headers = [h.lower() for h in headers] if headers is not None else ["date"] - self.passphrase = passphrase if passphrase is None or isinstance(passphrase, bytes) else passphrase.encode() + self.passphrase = ( + passphrase + if passphrase is None or isinstance(passphrase, bytes) + else passphrase.encode() + ) self.expires_in = expires_in def add_date(self, request, timestamp): @@ -73,7 +103,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase): def add_digest(self, request): if request.body is None and "digest" in self.headers: - raise RequestsHttpSignatureException("Could not compute digest header for request without a body") + raise RequestsHttpSignatureException( + "Could not compute digest header for request without a body" + ) if request.body is not None and "Digest" not in request.headers: if "digest" not in self.headers: self.headers.append("digest") @@ -81,7 +113,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase): request.headers["Digest"] = "SHA-256=" + base64.b64encode(digest).decode() @classmethod - def get_string_to_sign(self, request, headers, created_timestamp, expires_timestamp): + def get_string_to_sign( + self, request, headers, created_timestamp, expires_timestamp + ): sts = [] for header in headers: if header == "(request-target)": @@ -90,15 +124,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase): elif header == "(created)" and created_timestamp: sts.append("{}: {}".format(header, created_timestamp)) elif header == "(expires)": - assert (expires_timestamp is not None), \ - 'You should provide the "expires_in" argument when using the (expires) header' + assert ( + expires_timestamp is not None + ), 'You should provide the "expires_in" argument when using the (expires) header' sts.append("{}: {}".format(header, int(expires_timestamp))) else: if header.lower() == "host": url = urlparse(request.url) value = request.headers.get("host", url.hostname) - if url.scheme == "http" and url.port not in [None, 80] or url.scheme == "https" \ - and url.port not in [443, None]: + if ( + url.scheme == "http" + and url.port not in [None, 80] + or url.scheme == "https" + and url.port not in [443, None] + ): value = "{}:{}".format(value, url.port) else: value = request.headers[header] @@ -113,7 +152,9 @@ class HTTPSignatureAuth(requests.auth.AuthBase): self.add_date(request, created_timestamp) self.add_digest(request) raw_sig = Crypto(self.algorithm).sign( - string_to_sign=self.get_string_to_sign(request, self.headers, created_timestamp, expires_timestamp), + string_to_sign=self.get_string_to_sign( + request, self.headers, created_timestamp, expires_timestamp + ), key=self.key.encode() if isinstance(self.key, str) else self.key, passphrase=self.passphrase, ) @@ -124,15 +165,20 @@ class HTTPSignatureAuth(requests.auth.AuthBase): ("headers", " ".join(self.headers)), ("signature", sig), ] - if not (self.algorithm.startswith("rsa") or self.algorithm.startswith("hmac") or - self.algorithm.startswith("ecdsa")): + if not ( + self.algorithm.startswith("rsa") + or self.algorithm.startswith("hmac") + or self.algorithm.startswith("ecdsa") + ): sig_struct.append(("created", int(created_timestamp))) if expires_timestamp is not None: sig_struct.append(("expires", int(expires_timestamp))) return ",".join('{}="{}"'.format(k, v) for k, v in sig_struct) def __call__(self, request): - request.headers["Authorization"] = "Signature " + self.create_signature_string(request) + request.headers["Authorization"] = "Signature " + self.create_signature_string( + request + ) return request @classmethod @@ -140,31 +186,48 @@ class HTTPSignatureAuth(requests.auth.AuthBase): sig_struct = request.headers[scheme] if scheme == "Authorization": sig_struct = sig_struct.split(" ", 1)[1] - return {i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') for i in sig_struct.split(",")} + return { + i.split("=", 1)[0]: i.split("=", 1)[1].strip('"') + for i in sig_struct.split(",") + } @classmethod def verify(self, request, key_resolver, scheme="Authorization"): if scheme == "Authorization": assert "Authorization" in request.headers, "No Authorization header found" - msg = 'Unexpected scheme found in Authorization header (expected "Signature")' + msg = ( + 'Unexpected scheme found in Authorization header (expected "Signature")' + ) assert request.headers["Authorization"].startswith("Signature "), msg elif scheme == "Signature": assert "Signature" in request.headers, "No Signature header found" else: - raise RequestsHttpSignatureException('Unknown signature scheme "{}"'.format(scheme)) + raise RequestsHttpSignatureException( + 'Unknown signature scheme "{}"'.format(scheme) + ) sig_struct = self.get_sig_struct(request, scheme=scheme) for field in "keyId", "algorithm", "signature": - assert field in sig_struct, 'Required signature parameter "{}" not found'.format(field) - assert sig_struct["algorithm"] in self.known_algorithms, "Unknown signature algorithm" - created_timestamp = int(sig_struct['created']) if 'created' in sig_struct else None - expires_timestamp = sig_struct.get('expires') + assert ( + field in sig_struct + ), 'Required signature parameter "{}" not found'.format(field) + assert ( + sig_struct["algorithm"] in self.known_algorithms + ), "Unknown signature algorithm" + created_timestamp = ( + int(sig_struct["created"]) if "created" in sig_struct else None + ) + expires_timestamp = sig_struct.get("expires") if expires_timestamp is not None: expires_timestamp = int(expires_timestamp) headers = sig_struct.get("headers", "date").split(" ") sig = base64.b64decode(sig_struct["signature"]) - sts = self.get_string_to_sign(request, headers, created_timestamp, expires_timestamp=expires_timestamp) - key = key_resolver(key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"]) + sts = self.get_string_to_sign( + request, headers, created_timestamp, expires_timestamp=expires_timestamp + ) + key = key_resolver( + key_id=sig_struct["keyId"], algorithm=sig_struct["algorithm"] + ) Crypto(sig_struct["algorithm"]).verify(sig, sts, key) if expires_timestamp is not None: assert expires_timestamp > int(time.time()) @@ -172,8 +235,8 @@ class HTTPSignatureAuth(requests.auth.AuthBase): class HTTPSignatureHeaderAuth(HTTPSignatureAuth): """ - https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4 - Using "Signature" header instead of "Authorization" header. + https://tools.ietf.org/html/draft-cavage-http-signatures-08#section-4 + Using "Signature" header instead of "Authorization" header. """ def __call__(self, request): diff --git a/test/test.py b/test/test.py index c8e508d0b06d4cbc3fd1797c89dc412829169d04..04d9f1300c7bfab978332a8b1b6d8299110b0f75 100755 --- a/test/test.py +++ b/test/test.py @@ -9,9 +9,16 @@ import requests from cryptography.fernet import Fernet from requests.adapters import HTTPAdapter -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # noqa +sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +) # noqa + +from requests_http_signature import ( + HTTPSignatureAuth, + HTTPSignatureHeaderAuth, + RequestsHttpSignatureException, +) -from requests_http_signature import HTTPSignatureAuth, HTTPSignatureHeaderAuth, RequestsHttpSignatureException hmac_secret = b"monorail_cat" passphrase = b"passw0rd" @@ -26,12 +33,17 @@ class TestAdapter(HTTPAdapter): if "pubkey" in request.headers: return base64.b64decode(request.headers["pubkey"]) return hmac_secret - HTTPSignatureAuth.verify(request, - key_resolver=key_resolver, - scheme=request.headers.get("sigScheme", "Authorization")) + + HTTPSignatureAuth.verify( + request, + key_resolver=key_resolver, + scheme=request.headers.get("sigScheme", "Authorization"), + ) if "expectSig" in request.headers: - self.testcase.assertEqual(request.headers["expectSig"], - HTTPSignatureAuth.get_sig_struct(request)["signature"]) + self.testcase.assertEqual( + request.headers["expectSig"], + HTTPSignatureAuth.get_sig_struct(request)["signature"], + ) response = requests.Response() response.status_code = requests.codes.ok response.url = request.url @@ -50,34 +62,45 @@ class TestRequestsHTTPSignature(unittest.TestCase): self.session.mount("http://", TestAdapter(self)) def test_readme_example(self): - preshared_key_id = 'squirrel' - preshared_secret = 'monorail_cat' - url = 'http://example.com/path' - requests.get(url, auth=HTTPSignatureAuth(key=preshared_secret, key_id=preshared_key_id)) + preshared_key_id = "squirrel" + preshared_secret = "monorail_cat" + url = "http://example.com/path" + requests.get( + url, auth=HTTPSignatureAuth(key=preshared_secret, key_id=preshared_key_id) + ) def test_basic_statements(self): - url = 'http://example.com/path?query#fragment' + url = "http://example.com/path?query#fragment" self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret, key_id="sekret")) with self.assertRaises(AssertionError): - self.session.get(url, auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret")) - with self.assertRaisesRegex(RequestsHttpSignatureException, - "Could not compute digest header for request without a body"): - self.session.get(url, - auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret", headers=["date", "digest"])) + self.session.get( + url, auth=HTTPSignatureAuth(key=hmac_secret[::-1], key_id="sekret") + ) + with self.assertRaisesRegex( + RequestsHttpSignatureException, + "Could not compute digest header for request without a body", + ): + self.session.get( + url, + auth=HTTPSignatureAuth( + key=hmac_secret[::-1], key_id="sekret", headers=["date", "digest"] + ), + ) def test_expired_signature(self): with self.assertRaises(AssertionError): - preshared_key_id = 'squirrel' + preshared_key_id = "squirrel" key = Fernet.generate_key() one_month = timedelta(days=-30) headers = ["(expires)"] - auth = HTTPSignatureAuth(key=key, key_id=preshared_key_id, - expires_in=one_month, headers=headers) + auth = HTTPSignatureAuth( + key=key, key_id=preshared_key_id, expires_in=one_month, headers=headers + ) def key_resolver(key_id, algorithm): return key - url = 'http://example.com/path' + url = "http://example.com/path" response = requests.get(url, auth=auth) HTTPSignatureAuth.verify(response.request, key_resolver=key_resolver) @@ -87,12 +110,14 @@ class TestRequestsHTTPSignature(unittest.TestCase): # Also, the values in https://github.com/joyent/node-http-signature/blob/master/test/verify.test.js don't match # up with ours. This is because node-http-signature seems to compute the content-length incorrectly in its test # suite (it should be 18, but they use 17). - url = 'http://example.org/foo' + url = "http://example.org/foo" payload = {"hello": "world"} date = "Thu, 05 Jan 2012 21:31:40 GMT" - auth = HTTPSignatureAuth(key=hmac_secret, - key_id="sekret", - headers=["(request-target)", "host", "date", "digest", "content-length"]) + auth = HTTPSignatureAuth( + key=hmac_secret, + key_id="sekret", + headers=["(request-target)", "host", "date", "digest", "content-length"], + ) self.session.post(url, json=payload, headers={"Date": date}, auth=auth) pubkey_fn = os.path.join(os.path.dirname(__file__), "pubkey.pem") @@ -101,46 +126,70 @@ class TestRequestsHTTPSignature(unittest.TestCase): with open(pubkey_fn, "rb") as pubkey, open(privkey_fn, "rb") as privkey: pubkey_b64 = base64.b64encode(pubkey.read()) - auth = DigestlessSignatureAuth(algorithm="rsa-sha256", key=privkey.read(), key_id="Test") - expect_sig = "ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=" # noqa + auth = DigestlessSignatureAuth( + algorithm="rsa-sha256", key=privkey.read(), key_id="Test" + ) + expect_sig = "ATp0r26dbMIxOopqw0OfABDT7CKMIoENumuruOtarj8n/97Q3htHFYpH8yOSQk3Z5zh8UxUym6FYTb5+A0Nz3NRsXJibnYi7brE/4tx5But9kkFGzG+xpUmimN4c3TMN7OFH//+r8hBf7BT9/GmHDUVZT2JzWGLZES2xDOUuMtA=" # noqa headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig} self.session.post(url, json=payload, headers=headers, auth=auth) with open(pubkey_fn, "rb") as pubkey, open(privkey_fn, "rb") as privkey: pubkey_b64 = base64.b64encode(pubkey.read()) - auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=privkey.read(), key_id="Test", - headers="(request-target) host date content-type digest content-length".split()) - expect_sig = "DkOOyDlO9rXmOiU+k6L86N4UFEcey2YD+/Bz8c+Sr6XVDtDCxUuFEHMO+Atag/V1iLu+3KczVrCwjaZ39Ox3RufJghHzhTffyEkfPI6Ivf271mfRU9+wLxuGj9f+ATVO14nvcZyQjAMLvV7qh35zQcYdeD5XyxLLjuYUnK14rYI=" # noqa - headers = {"Date": date, "pubkey": pubkey_b64, "expectSig": expect_sig, "content-type": "application/json"} + auth = HTTPSignatureAuth( + algorithm="rsa-sha256", + key=privkey.read(), + key_id="Test", + headers="(request-target) host date content-type digest content-length".split(), + ) + expect_sig = "DkOOyDlO9rXmOiU+k6L86N4UFEcey2YD+/Bz8c+Sr6XVDtDCxUuFEHMO+Atag/V1iLu+3KczVrCwjaZ39Ox3RufJghHzhTffyEkfPI6Ivf271mfRU9+wLxuGj9f+ATVO14nvcZyQjAMLvV7qh35zQcYdeD5XyxLLjuYUnK14rYI=" # noqa + headers = { + "Date": date, + "pubkey": pubkey_b64, + "expectSig": expect_sig, + "content-type": "application/json", + } self.session.post(url, json=payload, headers=headers, auth=auth) - auth = HTTPSignatureHeaderAuth(key=hmac_secret, - key_id="sekret", - headers=["(request-target)", "host", "date", "digest", "content-length"]) - self.session.post(url, json=payload, headers={"Date": date, "sigScheme": "Signature"}, auth=auth) + auth = HTTPSignatureHeaderAuth( + key=hmac_secret, + key_id="sekret", + headers=["(request-target)", "host", "date", "digest", "content-length"], + ) + self.session.post( + url, + json=payload, + headers={"Date": date, "sigScheme": "Signature"}, + auth=auth, + ) def test_rsa(self): from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization + private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() + public_exponent=65537, key_size=2048, backend=default_backend() ) private_key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.BestAvailableEncryption(passphrase) + encryption_algorithm=serialization.BestAvailableEncryption(passphrase), ) public_key_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + url = "http://example.com/path?query#fragment" + auth = HTTPSignatureAuth( + algorithm="rsa-sha256", + key=private_key_pem, + key_id="sekret", + passphrase=passphrase, + ) + self.session.get( + url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem)) ) - url = 'http://example.com/path?query#fragment' - auth = HTTPSignatureAuth(algorithm="rsa-sha256", key=private_key_pem, key_id="sekret", passphrase=passphrase) - self.session.get(url, auth=auth, headers=dict(pubkey=base64.b64encode(public_key_pem))) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()