Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import aiohttp
from . import exceptions
from . import logs
from . import schemas
from . import settings
def get_session_kwargs():
headers = {"User-Agent": settings.USER_AGENT}
return {
"timeout": aiohttp.ClientTimeout(total=settings.TIMEOUT),
"headers": headers,
}
def get_session():
kwargs = get_session_kwargs()
return aiohttp.ClientSession(**kwargs)
async def fetch_nodeinfo(session, domain, protocol="https"):
nodeinfo = await get_well_known_data(session, domain=domain, protocol=protocol)
data = await get_nodeinfo(session, nodeinfo)
return data
async def get_well_known_data(session, domain, protocol="https"):
url = f"https://{domain}/.well-known/nodeinfo"
response = await session.get(url)
return await response.json()
async def get_nodeinfo(session, nodeinfo):
for link in nodeinfo.get("links", []):
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
response = await session.get(link["href"])
return await response.json()
raise exceptions.NoNodeInfo()
def clean_nodeinfo(data):
schema = schemas.NodeInfo2Schema()
result = schema.load(data)
return result.data