Skip to content
Snippets Groups Projects
cli.py 2.86 KiB
Newer Older
  • Learn to ignore specific revisions
  • import aiohttp
    import asyncio
    import click
    
    import logging.config
    import arq.worker
    
    import functools
    
    
    def async_command(f):
        def wrapper(*args, **kwargs):
            loop = asyncio.get_event_loop()
            return loop.run_until_complete(f(*args, **kwargs))
    
        return functools.update_wrapper(wrapper, f)
    
    
    def conn_command(f):
        async def wrapper(*args, **kwargs):
            from . import db
            from . import settings
    
            pool = await db.get_pool(settings.DB_DSN)
            try:
                async with pool.acquire() as conn:
                    kwargs["conn"] = conn
                    return await f(*args, **kwargs)
            finally:
                pool.close()
                await pool.wait_closed()
    
        return functools.update_wrapper(wrapper, f)
    
    
    @click.group()
    def cli():
        pass
    
    
    @cli.group()
    def db():
        """
        Database related commands (migrate, clear…)
        """
        pass
    
    
    
    @cli.group()
    def worker():
        pass
    
    
    
    @db.command()
    @async_command
    @conn_command
    async def migrate(conn):
        """
        Create database tables.
        """
        from . import db
    
        await db.create(conn)
    
    
    @db.command()
    @async_command
    @conn_command
    async def clear(conn):
        """
        Drop database tables.
        """
        from . import db
    
        await db.clear(conn)
    
    
    @cli.command()
    def server():
        """
        Start web server.
        """
        from . import server
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        from . import settings
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        server.start(port=settings.PORT)
    
    
    
    async def launch_domain_poll(pool, session, domain):
        from . import crawler
    
        async with pool.acquire() as conn:
            return await crawler.check(conn=conn, session=session, domain=domain)
    
    
    @cli.command()
    
    @click.argument("domain", type=str, nargs=-1)
    
    @async_command
    async def poll(domain):
        """
        Retrieve and store data for the specified domains.
        """
        from . import crawler
        from . import db
        from . import settings
    
        from . import worker
    
    
        pool = await db.get_pool(settings.DB_DSN)
    
        if not domain:
            click.echo("Polling all domains…")
            crawler = worker.Crawler(
                redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG)
            )
            return await crawler.poll_all()
    
    
        try:
            kwargs = crawler.get_session_kwargs()
            async with aiohttp.ClientSession(**kwargs) as session:
                tasks = [launch_domain_poll(pool, session, d) for d in domain]
                return await asyncio.wait(tasks)
    
        finally:
            pool.close()
            await pool.wait_closed()
    
    
    
    @worker.command()
    @click.option("-v", "--verbose", is_flag=True)
    @click.option("--check", is_flag=True)
    def start(*, check, verbose):
        worker = arq.worker.import_string("funkwhale_network.worker", "Worker")
        logging.config.dictConfig(worker.logging_config(verbose))
    
        if check:
            exit(worker.check_health())
        else:
            arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False)