diff --git a/funkwhale_network/cli.py b/funkwhale_network/cli.py index be72b99d05d7919db3ca238bf6d8afa6b138c042..d179f7b6f74b41155b51313165e14a2220b64cca 100644 --- a/funkwhale_network/cli.py +++ b/funkwhale_network/cli.py @@ -2,7 +2,6 @@ import aiohttp import asyncio import click import logging.config -import arq.worker import functools import ssl import sys @@ -167,9 +166,7 @@ async def poll(domain): pool = await db.get_pool(settings.DB_DSN) if not domain: click.echo("Polling all domains…") - crawler = worker.Crawler( - redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG) - ) + crawler = worker.Crawler() return await crawler.poll_all() try: @@ -308,13 +305,15 @@ def aggregate_crawl_results(domains_info): @click.option("-v", "--verbose", is_flag=True) @click.option("--check", is_flag=True) def start(*, check, verbose): - worker = arq.worker.import_string("funkwhale_network.worker", "Worker") - logging.config.dictConfig(worker.logging_config(verbose)) + #worker = arq.worker.import_string("funkwhale_network.worker", "Worker") + #logging.config.dictConfig(worker.logging_config(verbose)) if check: - exit(worker.check_health()) + pass + #exit(worker.check_health()) else: - arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False) + pass + #arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False) def main(): diff --git a/funkwhale_network/server.py b/funkwhale_network/server.py index 97245992706f17ca233b948bf0a2059c78074e5b..289200b53c1d752cf81623eafdfd22a94cff9edf 100644 --- a/funkwhale_network/server.py +++ b/funkwhale_network/server.py @@ -1,8 +1,6 @@ -import arq - -from funkwhale_network import routes -from funkwhale_network import worker -from funkwhale_network import settings +from . import routes +from . import worker +from . import settings from aiohttp import web @@ -13,9 +11,7 @@ def prepare_app(app, pool): app.router.add_get("/api/domains/stats", routes.stats) app.add_routes([web.static("/static", routes.STATIC_DIR)]) app["pool"] = pool - app["crawler"] = worker.Crawler( - redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG) - ) + app["crawler"] = worker.Crawler() async def on_shutdown(app): diff --git a/funkwhale_network/worker.py b/funkwhale_network/worker.py index 54cf19be56e3ecf861fbe7763f32d9ef3a3592c7..9d3575d76f79958c784106f4a056c099edf902ee 100644 --- a/funkwhale_network/worker.py +++ b/funkwhale_network/worker.py @@ -1,23 +1,26 @@ import aiohttp -import arq from funkwhale_network import crawler from funkwhale_network import db from funkwhale_network import settings +from aiohttp import ClientSession +from arq import cron -class Crawler(arq.Actor): + +#class Crawler(arq.Actor): +class Crawler(): async def startup(self): kwargs = crawler.get_session_kwargs() - self.session = aiohttp.ClientSession(loop=self.loop, **kwargs) + self.session = aiohttp.ClientSession()#loop=self.loop, **kwargs) self.pool = await db.get_pool(settings.DB_DSN) - @arq.concurrent + #@arq.concurrent async def poll(self, domain): async with self.pool.acquire() as conn: return await crawler.check(conn=conn, session=self.session, domain=domain) - @arq.cron(minute=0, hour={0, 6, 12, 18}) + #@arq.cron(minute=0, hour={0, 6, 12, 18}) async def poll_all(self): async with self.pool.acquire() as conn: for check in await db.get_latest_check_by_domain(conn): @@ -29,13 +32,21 @@ class Crawler(arq.Actor): await self.pool.wait_closed() - -class Worker(arq.BaseWorker): - shadows = [Crawler] +async def poll(ctx, domain): + session: ClientSession = ctx['session'] + pool = await db.get_pool(settings.DB_DSN) + async with pool as conn: + return await crawler.check(conn=conn, session=session, domain=domain) + +async def update_all(ctx): + pool = await db.get_pool(settings.DB_DSN) + for check in await db.get_latest_check_by_domain(pool): + await poll(ctx, check["domain"]) + +class WorkerSettings: + cron_jobs = [ + cron(update_all, minute=None) + ] max_concurrent_tasks = 20 shutdown_delay = 5 timeout_seconds = 15 - - def __init__(self, *args, **kwargs): - kwargs.setdefault("redis_settings", arq.RedisSettings(**settings.REDIS_CONFIG)) - super().__init__(*args, **kwargs)