Skip to content
Snippets Groups Projects
worker.py 1.53 KiB
Newer Older
  • Learn to ignore specific revisions
  • import aiohttp
    
    
    from funkwhale_network import crawler
    from funkwhale_network import db
    from funkwhale_network import settings
    
    Georg Krause's avatar
    Georg Krause committed
    from aiohttp import ClientSession
    from arq import cron
    
    Georg Krause's avatar
    Georg Krause committed
    
    #class Crawler(arq.Actor):
    class Crawler():
    
        async def startup(self):
            kwargs = crawler.get_session_kwargs()
    
    Georg Krause's avatar
    Georg Krause committed
            self.session = aiohttp.ClientSession()#loop=self.loop, **kwargs)
    
            self.pool = await db.get_pool(settings.DB_DSN)
    
    
    Georg Krause's avatar
    Georg Krause committed
        #@arq.concurrent
    
        async def poll(self, domain):
            async with self.pool.acquire() as conn:
                return await crawler.check(conn=conn, session=self.session, domain=domain)
    
    
    Georg Krause's avatar
    Georg Krause committed
        #@arq.cron(minute=0, hour={0, 6, 12, 18})
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        async def poll_all(self):
            async with self.pool.acquire() as conn:
    
                for check in await db.get_latest_check_by_domain(conn):
                    await self.poll(check["domain"])
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
    
        async def shutdown(self):
            await self.session.close()
            self.pool.close()
    
            await self.pool.wait_closed()
    
    
    Georg Krause's avatar
    Georg Krause committed
    async def poll(ctx, domain):
        session: ClientSession = ctx['session']
        pool = await db.get_pool(settings.DB_DSN)
        async with pool as conn:
            return await crawler.check(conn=conn, session=session, domain=domain)
    
    async def update_all(ctx):
        pool = await db.get_pool(settings.DB_DSN)
        for check in await db.get_latest_check_by_domain(pool):
            await poll(ctx, check["domain"])
        
    class WorkerSettings:
        cron_jobs = [
            cron(update_all, minute=None)
        ]
    
        max_concurrent_tasks = 20
        shutdown_delay = 5
        timeout_seconds = 15