Newer
Older
import aiohttp
import sys
from . import db
from . import exceptions
from . import settings
from . import serializers
def get_session_kwargs():
headers = {"User-Agent": settings.CRAWLER_USER_AGENT}
return {
"timeout": aiohttp.ClientTimeout(total=settings.CRAWLER_TIMEOUT),
"headers": headers,
}
async def fetch_nodeinfo(session, domain):
nodeinfo = await get_well_known_data(session, domain)
data = await get_nodeinfo(session, nodeinfo)
return data
async def get_well_known_data(session, domain, protocol="https"):
url = f"https://{domain}/.well-known/nodeinfo"
return await response.json()
async def get_nodeinfo(session, nodeinfo):
for link in nodeinfo.get("links", []):
if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
response = await session.get(link["href"], ssl=False)
raise exceptions.NoNodeInfo()
async def check(conn, session, domain, stdout=sys.stdout):
await serializers.create_domain(conn, {"name": domain})
check_data = {"up": True, "domain": domain}
try:
nodeinfo = await fetch_nodeinfo(session, domain)
cleaned_nodeinfo = clean_nodeinfo(nodeinfo)
cleaned_check = clean_check(check_data, cleaned_nodeinfo)
except (aiohttp.client_exceptions.ClientError, exceptions.CrawlerError) as e:
stdout.write(f"Error while fetching {domain}: {e}, marking instance as down")
check_data["up"] = False
cleaned_check = check_data
await save_check(conn, cleaned_check)
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def crawl_all(session, *domains, stdout, max_passes):
data = {
"pending_domains": set(domains),
"valid_domains": set(),
"invalid_domains": set(),
"handled_domains": set(),
"results": {},
"pass_number": 0,
}
def print_pass():
stdout(
"[Pass {pass_number}] {pending_domains} new domains to crawl, {handled_domains} checked, {valid_domains} valid".format(
pass_number=data["pass_number"],
pending_domains=len(data["pending_domains"]),
handled_domains=len(data["handled_domains"]),
valid_domains=len(data["valid_domains"]),
)
)
while data["pending_domains"] and data["pass_number"] < max_passes:
data["pass_number"] += 1
print_pass()
tasks = [
crawl_single(session, domain, data) for domain in data["pending_domains"]
]
await asyncio.wait(tasks)
if data["pass_number"] < max_passes:
print_pass()
return data
async def crawl_single(session, domain, data):
try:
nodeinfo_data = await fetch_nodeinfo(session, domain)
cleaned_data = clean_check(
{"domain": domain, "up": True}, clean_nodeinfo(nodeinfo_data)
)
except Exception as e:
data["invalid_domains"].add(domain)
return
finally:
data["pending_domains"].remove(domain)
data["handled_domains"].add(domain)
nodes_url = recursive_getattr(nodeinfo_data, "metadata.knownNodesListUrl")
if nodes_url:
try:
await gather_known_nodes(session, nodes_url, data)
except:
pass
data["valid_domains"].add(domain)
data["results"][domain] = cleaned_data
async def gather_known_nodes(session, url, data):
fetch_url = url
while fetch_url:
response = await session.get(fetch_url, ssl=False)
result = await response.json()
fetch_url = result.get("next")
known_domains = set([d["name"] for d in result["results"]])
data["pending_domains"] |= known_domains - data["handled_domains"]
def clean_nodeinfo(data):
schema = schemas.NodeInfo2Schema()
result = schema.load(data)
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def recursive_getattr(obj, key, permissive=True):
"""
Given a dictionary such as {'user': {'name': 'Bob'}} and
a dotted string such as user.name, returns 'Bob'.
If the value is not present, returns None
"""
v = obj
for k in key.split("."):
try:
v = v.get(k)
except (TypeError, AttributeError):
if not permissive:
raise
return
if v is None:
return
return v
def clean_check(check_data, nodeinfo_data):
return {
"domain": check_data["domain"],
"up": check_data["up"],
"node_name": recursive_getattr(nodeinfo_data, "metadata.nodeName"),
"open_registrations": recursive_getattr(nodeinfo_data, "openRegistrations"),
"federation_enabled": recursive_getattr(
nodeinfo_data, "metadata.library.federationEnabled"
),
"anonymous_can_listen": recursive_getattr(
nodeinfo_data, "metadata.library.anonymousCanListen"
),
"private": recursive_getattr(nodeinfo_data, "metadata.private"),
"usage_users_total": recursive_getattr(nodeinfo_data, "usage.users.total"),
"usage_users_active_half_year": recursive_getattr(
nodeinfo_data, "usage.users.activeHalfyear"
),
"usage_users_active_month": recursive_getattr(
nodeinfo_data, "usage.users.activeMonth"
),
"usage_listenings_total": recursive_getattr(
nodeinfo_data, "metadata.usage.listenings.total"
),
"usage_downloads_total": recursive_getattr(
nodeinfo_data, "metadata.usage.downloads.total"
),
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"library_tracks_total": recursive_getattr(
nodeinfo_data, "metadata.library.tracks.total"
),
"library_albums_total": recursive_getattr(
nodeinfo_data, "metadata.library.albums.total"
),
"library_artists_total": recursive_getattr(
nodeinfo_data, "metadata.library.artists.total"
),
"library_music_hours": recursive_getattr(
nodeinfo_data, "metadata.library.music.hours"
),
"software_name": recursive_getattr(nodeinfo_data, "software.name"),
"software_version_major": recursive_getattr(
nodeinfo_data, "software.version.major"
),
"software_version_minor": recursive_getattr(
nodeinfo_data, "software.version.minor"
),
"software_version_patch": recursive_getattr(
nodeinfo_data, "software.version.patch"
),
"software_prerelease": recursive_getattr(
nodeinfo_data, "software.version.prerelease"
),
"software_build": recursive_getattr(nodeinfo_data, "software.version.build"),
}
@db.dict_cursor
async def save_check(cursor, data):
node_name = data.pop("node_name", None)
fields, values = [], []
for field, value in data.items():
fields.append(field)
values.append(value)
sql = "INSERT INTO checks (time, {}) VALUES (NOW(), {}) RETURNING *".format(
", ".join(fields), ", ".join(["%s" for _ in values])
)
await cursor.execute(sql, values)
check = await cursor.fetchone()
if data.get("private") is True:
# let's clean previous checks
sql = "DELETE FROM checks WHERE domain = %s"
await cursor.execute(sql, [data["domain"]])
return
if node_name:
await cursor.execute(
"UPDATE domains SET node_name = %s WHERE name = %s",
[node_name, data["domain"]],
)
return check