Newer
Older
import aiohttp
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from . import exceptions
from . import schemas
from . import settings
def get_session_kwargs():
headers = {"User-Agent": settings.USER_AGENT}
return {
"timeout": aiohttp.ClientTimeout(total=settings.TIMEOUT),
"headers": headers,
}
def get_session():
kwargs = get_session_kwargs()
return aiohttp.ClientSession(**kwargs)
async def fetch_nodeinfo(session, domain, protocol="https"):
nodeinfo = await get_well_known_data(session, domain=domain, protocol=protocol)
data = await get_nodeinfo(session, nodeinfo)
return data
async def get_well_known_data(session, domain, protocol="https"):
url = f"https://{domain}/.well-known/nodeinfo"
response = await session.get(url)
return await response.json()
async def get_nodeinfo(session, nodeinfo):
for link in nodeinfo.get("links", []):
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
response = await session.get(link["href"])
return await response.json()
raise exceptions.NoNodeInfo()
def clean_nodeinfo(data):
schema = schemas.NodeInfo2Schema()
result = schema.load(data)
return result.data
async def get_jwt_token(session, url, username, password):
url = f"{url}/api/v1/token/"
response = await session.post(
url, data={"username": username, "password": password}
)
if response.status == 400:
raise exceptions.AuthenticationError(
"Unable to log in with provided credentials"
)
return (await response.json())["token"]
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def get_oauth_token(session, url, client_id, client_secret):
args = {"response_type": "code", "redirect_uri": "urn:ietf:wg:oauth:2.0:oob", "client_id": client_id,
"client_secret": client_secret, "scope": "read write"}
browser_url = f"{url}authorize?{urllib.parse.urlencode(args)}"
webbrowser.open(browser_url)
code = input("Enter the code from funkwhale")
api_url = f"{url}api/v1/oauth/token/"
response = await session.post(
api_url, data={"client_id": client_id, "client_secret": client_secret,
"grant_type": "authorization_code", "code": code}
)
return await extract_tokens(response)
async def refresh_oauth_token(session, url, client_id, client_secret, refresh_token):
api_url = f"{url}api/v1/oauth/token/"
response = await session.post(
api_url, data={"client_id": client_id, "client_secret": client_secret,
"grant_type": "refresh_token", "refresh_token": refresh_token}
)
return await extract_tokens(response)
async def extract_tokens(response):
if response.status == 400:
raise exceptions.AuthenticationError(
"Unable to log in with provided credentials"
)
response_json = await response.json()
access_token = response_json["access_token"]
refresh_token = response_json["refresh_token"]
return access_token, refresh_token
self.base_url = base_url
self.token = token
self._session = None
async def __aenter__(self):
self._session = get_session()
await self._session.__aenter__()
async def __aexit__(self, exc_type, exc, tb):
if self._session:
await self._session.__aexit__(exc_type, exc, tb)
self._session = None
def request(self, method, path, *args, **kwargs):
if path.startswith("http://") or path.startswith("https://"):
full_url = path
else:
full_url = self.base_url + path
headers = kwargs.setdefault("headers", {})
if self.token:
scheme = "JWT" if len(self.token) > 50 else "Bearer"
headers["Authorization"] = " ".join([scheme, str(self.token)])
handler = getattr(self._session, method)
return handler(full_url, *args, **kwargs)
def get_api(domain, protocol, token=None):