Newer
Older
import aiohttp
import asyncio
import click
import logging.config
import arq.worker
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import functools
def async_command(f):
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(f(*args, **kwargs))
return functools.update_wrapper(wrapper, f)
def conn_command(f):
async def wrapper(*args, **kwargs):
from . import db
from . import settings
pool = await db.get_pool(settings.DB_DSN)
try:
async with pool.acquire() as conn:
kwargs["conn"] = conn
return await f(*args, **kwargs)
finally:
pool.close()
await pool.wait_closed()
return functools.update_wrapper(wrapper, f)
@click.group()
def cli():
pass
@cli.group()
def db():
"""
Database related commands (migrate, clear…)
"""
pass
@cli.group()
def worker():
pass
@db.command()
@async_command
@conn_command
async def migrate(conn):
"""
Create database tables.
"""
from . import db
await db.create(conn)
@db.command()
@async_command
@conn_command
async def clear(conn):
"""
Drop database tables.
"""
from . import db
await db.clear(conn)
@cli.command()
def server():
"""
Start web server.
"""
from . import server
async def launch_domain_poll(pool, session, domain):
from . import crawler
async with pool.acquire() as conn:
return await crawler.check(conn=conn, session=session, domain=domain)
@cli.command()
@click.argument("domain", type=str, nargs=-1)
@async_command
async def poll(domain):
"""
Retrieve and store data for the specified domains.
"""
from . import crawler
from . import db
from . import settings
pool = await db.get_pool(settings.DB_DSN)
if not domain:
click.echo("Polling all domains…")
crawler = worker.Crawler(
redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG)
)
return await crawler.poll_all()
try:
kwargs = crawler.get_session_kwargs()
async with aiohttp.ClientSession(**kwargs) as session:
tasks = [launch_domain_poll(pool, session, d) for d in domain]
return await asyncio.wait(tasks)
finally:
pool.close()
await pool.wait_closed()
@worker.command()
@click.option("-v", "--verbose", is_flag=True)
@click.option("--check", is_flag=True)
def start(*, check, verbose):
worker = arq.worker.import_string("funkwhale_network.worker", "Worker")
logging.config.dictConfig(worker.logging_config(verbose))
if check:
exit(worker.check_health())
else:
arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False)
def main():
cli()
if __name__ == "__main__":
main()