From 4a70013eb44b09fb34e6df6d738727a73baf5f03 Mon Sep 17 00:00:00 2001 From: Georg Krause <mail@georg-krause.net> Date: Thu, 10 Nov 2022 09:35:16 +0100 Subject: [PATCH] Fix usage of arq --- funkwhale_network/cli.py | 15 +++++++-------- funkwhale_network/server.py | 12 ++++-------- funkwhale_network/worker.py | 35 +++++++++++++++++++++++------------ 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/funkwhale_network/cli.py b/funkwhale_network/cli.py index be72b99..d179f7b 100644 --- a/funkwhale_network/cli.py +++ b/funkwhale_network/cli.py @@ -2,7 +2,6 @@ import aiohttp import asyncio import click import logging.config -import arq.worker import functools import ssl import sys @@ -167,9 +166,7 @@ async def poll(domain): pool = await db.get_pool(settings.DB_DSN) if not domain: click.echo("Polling all domains…") - crawler = worker.Crawler( - redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG) - ) + crawler = worker.Crawler() return await crawler.poll_all() try: @@ -308,13 +305,15 @@ def aggregate_crawl_results(domains_info): @click.option("-v", "--verbose", is_flag=True) @click.option("--check", is_flag=True) def start(*, check, verbose): - worker = arq.worker.import_string("funkwhale_network.worker", "Worker") - logging.config.dictConfig(worker.logging_config(verbose)) + #worker = arq.worker.import_string("funkwhale_network.worker", "Worker") + #logging.config.dictConfig(worker.logging_config(verbose)) if check: - exit(worker.check_health()) + pass + #exit(worker.check_health()) else: - arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False) + pass + #arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False) def main(): diff --git a/funkwhale_network/server.py b/funkwhale_network/server.py index 9724599..289200b 100644 --- a/funkwhale_network/server.py +++ b/funkwhale_network/server.py @@ -1,8 +1,6 @@ -import arq - -from funkwhale_network import routes -from funkwhale_network import worker -from funkwhale_network import settings +from . import routes +from . import worker +from . import settings from aiohttp import web @@ -13,9 +11,7 @@ def prepare_app(app, pool): app.router.add_get("/api/domains/stats", routes.stats) app.add_routes([web.static("/static", routes.STATIC_DIR)]) app["pool"] = pool - app["crawler"] = worker.Crawler( - redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG) - ) + app["crawler"] = worker.Crawler() async def on_shutdown(app): diff --git a/funkwhale_network/worker.py b/funkwhale_network/worker.py index 54cf19b..9d3575d 100644 --- a/funkwhale_network/worker.py +++ b/funkwhale_network/worker.py @@ -1,23 +1,26 @@ import aiohttp -import arq from funkwhale_network import crawler from funkwhale_network import db from funkwhale_network import settings +from aiohttp import ClientSession +from arq import cron -class Crawler(arq.Actor): + +#class Crawler(arq.Actor): +class Crawler(): async def startup(self): kwargs = crawler.get_session_kwargs() - self.session = aiohttp.ClientSession(loop=self.loop, **kwargs) + self.session = aiohttp.ClientSession()#loop=self.loop, **kwargs) self.pool = await db.get_pool(settings.DB_DSN) - @arq.concurrent + #@arq.concurrent async def poll(self, domain): async with self.pool.acquire() as conn: return await crawler.check(conn=conn, session=self.session, domain=domain) - @arq.cron(minute=0, hour={0, 6, 12, 18}) + #@arq.cron(minute=0, hour={0, 6, 12, 18}) async def poll_all(self): async with self.pool.acquire() as conn: for check in await db.get_latest_check_by_domain(conn): @@ -29,13 +32,21 @@ class Crawler(arq.Actor): await self.pool.wait_closed() - -class Worker(arq.BaseWorker): - shadows = [Crawler] +async def poll(ctx, domain): + session: ClientSession = ctx['session'] + pool = await db.get_pool(settings.DB_DSN) + async with pool as conn: + return await crawler.check(conn=conn, session=session, domain=domain) + +async def update_all(ctx): + pool = await db.get_pool(settings.DB_DSN) + for check in await db.get_latest_check_by_domain(pool): + await poll(ctx, check["domain"]) + +class WorkerSettings: + cron_jobs = [ + cron(update_all, minute=None) + ] max_concurrent_tasks = 20 shutdown_delay = 5 timeout_seconds = 15 - - def __init__(self, *args, **kwargs): - kwargs.setdefault("redis_settings", arq.RedisSettings(**settings.REDIS_CONFIG)) - super().__init__(*args, **kwargs) -- GitLab