Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import aiohttp
from . import exceptions
from . import logs
from . import schemas
from . import settings
def get_session_kwargs():
headers = {"User-Agent": settings.USER_AGENT}
return {
"timeout": aiohttp.ClientTimeout(total=settings.TIMEOUT),
"headers": headers,
}
def get_session():
kwargs = get_session_kwargs()
return aiohttp.ClientSession(**kwargs)
async def fetch_nodeinfo(session, domain, protocol="https"):
nodeinfo = await get_well_known_data(session, domain=domain, protocol=protocol)
data = await get_nodeinfo(session, nodeinfo)
return data
async def get_well_known_data(session, domain, protocol="https"):
url = f"https://{domain}/.well-known/nodeinfo"
response = await session.get(url)
return await response.json()
async def get_nodeinfo(session, nodeinfo):
for link in nodeinfo.get("links", []):
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
response = await session.get(link["href"])
return await response.json()
raise exceptions.NoNodeInfo()
def clean_nodeinfo(data):
schema = schemas.NodeInfo2Schema()
result = schema.load(data)
return result.data
async def get_jwt_token(session, url, username, password):
url = f"{url}/api/v1/token/"
response = await session.post(
url, data={"username": username, "password": password}
)
if response.status == 400:
raise exceptions.AuthenticationError(
"Unable to log in with provided credentials"
)
return (await response.json())["token"]
self.base_url = base_url
self.token = token
self._session = None
async def __aenter__(self):
self._session = get_session()
await self._session.__aenter__()
async def __aexit__(self, exc_type, exc, tb):
if self._session:
await self._session.__aexit__(exc_type, exc, tb)
self._session = None
def request(self, method, path, *args, **kwargs):
if path.startswith("http://") or path.startswith("https://"):
full_url = path
else:
full_url = self.base_url + path
headers = kwargs.setdefault("headers", {})
if self.token:
headers["Authorization"] = "JWT {}".format(self.token)
handler = getattr(self._session, method)
return handler(full_url, *args, **kwargs)