Newer
Older
import aiohttp
import asyncio
import click
import logging.config
import arq.worker
import functools
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import ssl
import sys
from . import output
SSL_PROTOCOLS = (asyncio.sslproto.SSLProtocol,)
try:
import uvloop.loop
except ImportError:
pass
else:
SSL_PROTOCOLS = (*SSL_PROTOCOLS, uvloop.loop.SSLProtocol)
def ignore_aiohttp_ssl_eror(loop):
"""Ignore aiohttp #3535 / cpython #13548 issue with SSL data after close
There is an issue in Python 3.7 up to 3.7.3 that over-reports a
ssl.SSLError fatal error (ssl.SSLError: [SSL: KRB5_S_INIT] application data
after close notify (_ssl.c:2609)) after we are already done with the
connection. See GitHub issues aio-libs/aiohttp#3535 and
python/cpython#13548.
Given a loop, this sets up an exception handler that ignores this specific
exception, but passes everything else on to the previous exception handler
this one replaces.
Checks for fixed Python versions, disabling itself when running on 3.7.4+
or 3.8.
"""
if sys.version_info >= (3, 7, 4):
return
orig_handler = loop.get_exception_handler()
def ignore_ssl_error(loop, context):
if context.get("message") in {
"SSL error in data received",
"Fatal error on transport",
"SSL handshake failed",
"[SSL: TLSV1_ALERT_INTERNAL_ERROR] tlsv1 alert internal error",
}:
# validate we have the right exception, transport and protocol
exception = context.get("exception")
protocol = context.get("protocol")
if (
isinstance(exception, ssl.SSLError)
and exception.reason in ("KRB5_S_INIT", "TLSV1_ALERT_INTERNAL_ERROR")
and isinstance(protocol, SSL_PROTOCOLS)
):
if loop.get_debug():
asyncio.log.logger.debug("Ignoring asyncio SSL KRB5_S_INIT error")
return
if orig_handler is not None:
orig_handler(loop, context)
else:
loop.default_exception_handler(context)
loop.set_exception_handler(ignore_ssl_error)
def async_command(f):
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
return loop.run_until_complete(f(*args, **kwargs))
return functools.update_wrapper(wrapper, f)
def conn_command(f):
async def wrapper(*args, **kwargs):
from . import db
from . import settings
pool = await db.get_pool(settings.DB_DSN)
try:
async with pool.acquire() as conn:
kwargs["conn"] = conn
return await f(*args, **kwargs)
finally:
pool.close()
await pool.wait_closed()
return functools.update_wrapper(wrapper, f)
@click.group()
def cli():
pass
@cli.group()
def db():
"""
Database related commands (migrate, clear…)
"""
pass
@cli.group()
def worker():
pass
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@db.command()
@async_command
@conn_command
async def migrate(conn):
"""
Create database tables.
"""
from . import db
await db.create(conn)
@db.command()
@async_command
@conn_command
async def clear(conn):
"""
Drop database tables.
"""
from . import db
await db.clear(conn)
@cli.command()
def server():
"""
Start web server.
"""
from . import server
async def launch_domain_poll(pool, session, domain):
from . import crawler
async with pool.acquire() as conn:
return await crawler.check(conn=conn, session=session, domain=domain)
@cli.command()
@click.argument("domain", type=str, nargs=-1)
@async_command
async def poll(domain):
"""
Retrieve and store data for the specified domains.
"""
from . import crawler
from . import db
from . import settings
pool = await db.get_pool(settings.DB_DSN)
if not domain:
click.echo("Polling all domains…")
crawler = worker.Crawler(
redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG)
)
return await crawler.poll_all()
try:
kwargs = crawler.get_session_kwargs()
async with aiohttp.ClientSession(**kwargs) as session:
tasks = [launch_domain_poll(pool, session, d) for d in domain]
return await asyncio.wait(tasks)
finally:
pool.close()
await pool.wait_closed()
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
NOOP = object()
@cli.command()
@click.argument("domain", type=str, nargs=-1)
@click.option("--use-public", is_flag=True)
@click.option("--detail", default=NOOP)
@click.option("--passes", type=click.INT, default=999)
@click.option("--sort", default="Active users (30d)")
@async_command
async def crawl(domain, use_public, detail, passes, sort):
"""
Crawl the network starting from the given domain(s).
"""
from . import crawler
from . import settings
kwargs = crawler.get_session_kwargs()
async with aiohttp.ClientSession(**kwargs) as session:
if use_public:
url = "https://network.funkwhale.audio/api/domains?up=true"
click.echo("Retrieving list of public pods from {}…".format(url))
response = await session.get(url)
json = await response.json()
domain = set([d["name"] for d in json["results"]])
click.echo("Launching crawl with {} seed domains…".format(len(domain)))
results = await crawler.crawl_all(
session, *domain, stdout=click.echo, max_passes=passes
)
click.echo("Complete after {} passes:".format(results["pass_number"]))
aggregate = aggregate_crawl_results(results["results"])
if detail != NOOP:
click.echo("")
click.echo("Info per domain")
click.echo("===============")
click.echo("")
if not detail:
fields = [
"Domain",
"Active users (30d)",
"Users",
"Listenings",
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"Open registrations",
"Anonymous access",
"Private",
]
else:
fields = detail.split(",")
click.echo(
output.table(
results["results"].values(), type="Domain", fields=fields, sort=sort
)
)
click.echo("")
click.echo("Aggregated data")
click.echo("===============")
click.echo("")
click.echo(
output.obj_table(
aggregate,
type="Summary",
fields=[
"Domains",
"Active users (30d)",
"Active users (180d)",
"Users",
"Listenings",
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"Tracks",
"Albums",
"Artists",
"Hours of music",
"Open registrations",
"Federation enabled",
"Anonymous access",
"Private",
],
)
)
def aggregate_crawl_results(domains_info):
def count_true(values):
return sum([1 for v in values if v])
def permissive_sum(values):
return sum([v for v in values if v])
fields = {
"domain": len,
"usage_users_total": permissive_sum,
"usage_users_active_half_year": permissive_sum,
"usage_users_active_month": permissive_sum,
"usage_listenings_total": permissive_sum,
"usage_downloads_total": permissive_sum,
"library_tracks_total": permissive_sum,
"library_albums_total": permissive_sum,
"library_artists_total": permissive_sum,
"library_music_hours": permissive_sum,
"open_registrations": count_true,
"federation_enabled": count_true,
"anonymous_can_listen": count_true,
"private": count_true,
}
aggregate = {}
for field, handler in fields.items():
values = []
for info in domains_info.values():
values.append(info[field])
aggregate[field] = handler(values)
return aggregate
@worker.command()
@click.option("-v", "--verbose", is_flag=True)
@click.option("--check", is_flag=True)
def start(*, check, verbose):
worker = arq.worker.import_string("funkwhale_network.worker", "Worker")
logging.config.dictConfig(worker.logging_config(verbose))
if check:
exit(worker.check_health())
else:
arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False)
def main():
cli()
if __name__ == "__main__":
main()