Newer
Older
import aiohttp
import asyncio
import click
import functools
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
SSL_PROTOCOLS = (asyncio.sslproto.SSLProtocol,)
try:
import uvloop.loop
except ImportError:
pass
else:
SSL_PROTOCOLS = (*SSL_PROTOCOLS, uvloop.loop.SSLProtocol)
def ignore_aiohttp_ssl_eror(loop):
"""Ignore aiohttp #3535 / cpython #13548 issue with SSL data after close
There is an issue in Python 3.7 up to 3.7.3 that over-reports a
ssl.SSLError fatal error (ssl.SSLError: [SSL: KRB5_S_INIT] application data
after close notify (_ssl.c:2609)) after we are already done with the
connection. See GitHub issues aio-libs/aiohttp#3535 and
python/cpython#13548.
Given a loop, this sets up an exception handler that ignores this specific
exception, but passes everything else on to the previous exception handler
this one replaces.
Checks for fixed Python versions, disabling itself when running on 3.7.4+
or 3.8.
"""
if sys.version_info >= (3, 7, 4):
return
orig_handler = loop.get_exception_handler()
def ignore_ssl_error(loop, context):
if context.get("message") in {
"SSL error in data received",
"Fatal error on transport",
"SSL handshake failed",
"[SSL: TLSV1_ALERT_INTERNAL_ERROR] tlsv1 alert internal error",
}:
# validate we have the right exception, transport and protocol
exception = context.get("exception")
protocol = context.get("protocol")
if (
isinstance(exception, ssl.SSLError)
and exception.reason in ("KRB5_S_INIT", "TLSV1_ALERT_INTERNAL_ERROR")
and isinstance(protocol, SSL_PROTOCOLS)
):
if loop.get_debug():
asyncio.log.logger.debug("Ignoring asyncio SSL KRB5_S_INIT error")
return
if orig_handler is not None:
orig_handler(loop, context)
else:
loop.default_exception_handler(context)
loop.set_exception_handler(ignore_ssl_error)
def async_command(f):
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(f(*args, **kwargs))
return functools.update_wrapper(wrapper, f)
def conn_command(f):
async def wrapper(*args, **kwargs):
from . import db
from . import settings
pool = await db.get_pool(settings.DB_DSN)
try:
async with pool.acquire() as conn:
kwargs["conn"] = conn
return await f(*args, **kwargs)
finally:
pool.close()
await pool.wait_closed()
return functools.update_wrapper(wrapper, f)
@click.group()
def cli():
pass
@cli.group()
def db():
"""
Database related commands (migrate, clear…)
"""
pass
@cli.group()
def worker():
pass
@db.command()
@async_command
@conn_command
async def migrate(conn):
"""
Create database tables.
"""
await db.create(conn)
@db.command()
@async_command
@conn_command
async def clear(conn):
"""
Drop database tables.
"""
await db.clear(conn)
@cli.command()
def server():
"""
Start web server.
"""
from funkwhale_network import server
from funkwhale_network import settings
async def launch_domain_poll(pool, session, domain):
from . import crawler
async with pool.acquire() as conn:
return await crawler.check(conn=conn, session=session, domain=domain)
@cli.command()
@click.argument("domain", type=str, nargs=-1)
@async_command
async def poll(domain):
"""
Retrieve and store data for the specified domains.
"""
from . import crawler
from . import db
from . import settings
from . import worker
pool = await db.get_pool(settings.DB_DSN)
if not domain:
click.echo("Polling all domains…")
try:
kwargs = crawler.get_session_kwargs()
async with aiohttp.ClientSession(**kwargs) as session:
tasks = [launch_domain_poll(pool, session, d) for d in domain]
return await asyncio.wait(tasks)
finally:
pool.close()
await pool.wait_closed()
NOOP = object()
@cli.command()
@click.argument("domain", type=str, nargs=-1)
@click.option("--use-public", is_flag=True)
@click.option("--detail", default=NOOP)
@click.option("--passes", type=click.INT, default=999)
@click.option("--sort", default="Active users (30d)")
@async_command
async def crawl(domain, use_public, detail, passes, sort):
"""
Crawl the network starting from the given domain(s).
"""
from . import crawler
from . import settings
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
kwargs = crawler.get_session_kwargs()
async with aiohttp.ClientSession(**kwargs) as session:
if use_public:
url = "https://network.funkwhale.audio/api/domains?up=true"
click.echo("Retrieving list of public pods from {}…".format(url))
response = await session.get(url)
json = await response.json()
domain = set([d["name"] for d in json["results"]])
click.echo("Launching crawl with {} seed domains…".format(len(domain)))
results = await crawler.crawl_all(
session, *domain, stdout=click.echo, max_passes=passes
)
click.echo("Complete after {} passes:".format(results["pass_number"]))
aggregate = aggregate_crawl_results(results["results"])
if detail != NOOP:
click.echo("")
click.echo("Info per domain")
click.echo("===============")
click.echo("")
if not detail:
fields = [
"Domain",
"Active users (30d)",
"Users",
"Listenings",
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"Open registrations",
"Anonymous access",
"Private",
]
else:
fields = detail.split(",")
click.echo(
output.table(
results["results"].values(), type="Domain", fields=fields, sort=sort
)
)
click.echo("")
click.echo("Aggregated data")
click.echo("===============")
click.echo("")
click.echo(
output.obj_table(
aggregate,
type="Summary",
fields=[
"Domains",
"Active users (30d)",
"Active users (180d)",
"Users",
"Listenings",
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"Tracks",
"Albums",
"Artists",
"Hours of music",
"Open registrations",
"Federation enabled",
"Anonymous access",
"Private",
],
)
)
def aggregate_crawl_results(domains_info):
def count_true(values):
return sum([1 for v in values if v])
def permissive_sum(values):
return sum([v for v in values if v])
fields = {
"domain": len,
"usage_users_total": permissive_sum,
"usage_users_active_half_year": permissive_sum,
"usage_users_active_month": permissive_sum,
"usage_listenings_total": permissive_sum,
"usage_downloads_total": permissive_sum,
"library_tracks_total": permissive_sum,
"library_albums_total": permissive_sum,
"library_artists_total": permissive_sum,
"library_music_hours": permissive_sum,
"open_registrations": count_true,
"federation_enabled": count_true,
"anonymous_can_listen": count_true,
"private": count_true,
}
aggregate = {}
for field, handler in fields.items():
values = []
for info in domains_info.values():
values.append(info[field])
aggregate[field] = handler(values)
return aggregate
@worker.command()
@click.option("-v", "--verbose", is_flag=True)
@click.option("--check", is_flag=True)
def start(*, check, verbose):
# worker = arq.worker.import_string("funkwhale_network.worker", "Worker")
# logging.config.dictConfig(worker.logging_config(verbose))
# arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False)
def main():
cli()
if __name__ == "__main__":
main()