Skip to content
Snippets Groups Projects
Verified Commit 4a70013e authored by Georg Krause's avatar Georg Krause
Browse files

Fix usage of arq

parent 90deec39
No related branches found
No related tags found
1 merge request!43Apply patches from the server
Pipeline #25234 failed with stage
in 38 seconds
...@@ -2,7 +2,6 @@ import aiohttp ...@@ -2,7 +2,6 @@ import aiohttp
import asyncio import asyncio
import click import click
import logging.config import logging.config
import arq.worker
import functools import functools
import ssl import ssl
import sys import sys
...@@ -167,9 +166,7 @@ async def poll(domain): ...@@ -167,9 +166,7 @@ async def poll(domain):
pool = await db.get_pool(settings.DB_DSN) pool = await db.get_pool(settings.DB_DSN)
if not domain: if not domain:
click.echo("Polling all domains…") click.echo("Polling all domains…")
crawler = worker.Crawler( crawler = worker.Crawler()
redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG)
)
return await crawler.poll_all() return await crawler.poll_all()
try: try:
...@@ -308,13 +305,15 @@ def aggregate_crawl_results(domains_info): ...@@ -308,13 +305,15 @@ def aggregate_crawl_results(domains_info):
@click.option("-v", "--verbose", is_flag=True) @click.option("-v", "--verbose", is_flag=True)
@click.option("--check", is_flag=True) @click.option("--check", is_flag=True)
def start(*, check, verbose): def start(*, check, verbose):
worker = arq.worker.import_string("funkwhale_network.worker", "Worker") #worker = arq.worker.import_string("funkwhale_network.worker", "Worker")
logging.config.dictConfig(worker.logging_config(verbose)) #logging.config.dictConfig(worker.logging_config(verbose))
if check: if check:
exit(worker.check_health()) pass
#exit(worker.check_health())
else: else:
arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False) pass
#arq.RunWorkerProcess("funkwhale_network.worker", "Worker", burst=False)
def main(): def main():
......
import arq from . import routes
from . import worker
from funkwhale_network import routes from . import settings
from funkwhale_network import worker
from funkwhale_network import settings
from aiohttp import web from aiohttp import web
...@@ -13,9 +11,7 @@ def prepare_app(app, pool): ...@@ -13,9 +11,7 @@ def prepare_app(app, pool):
app.router.add_get("/api/domains/stats", routes.stats) app.router.add_get("/api/domains/stats", routes.stats)
app.add_routes([web.static("/static", routes.STATIC_DIR)]) app.add_routes([web.static("/static", routes.STATIC_DIR)])
app["pool"] = pool app["pool"] = pool
app["crawler"] = worker.Crawler( app["crawler"] = worker.Crawler()
redis_settings=arq.RedisSettings(**settings.REDIS_CONFIG)
)
async def on_shutdown(app): async def on_shutdown(app):
......
import aiohttp import aiohttp
import arq
from funkwhale_network import crawler from funkwhale_network import crawler
from funkwhale_network import db from funkwhale_network import db
from funkwhale_network import settings from funkwhale_network import settings
from aiohttp import ClientSession
from arq import cron
class Crawler(arq.Actor):
#class Crawler(arq.Actor):
class Crawler():
async def startup(self): async def startup(self):
kwargs = crawler.get_session_kwargs() kwargs = crawler.get_session_kwargs()
self.session = aiohttp.ClientSession(loop=self.loop, **kwargs) self.session = aiohttp.ClientSession()#loop=self.loop, **kwargs)
self.pool = await db.get_pool(settings.DB_DSN) self.pool = await db.get_pool(settings.DB_DSN)
@arq.concurrent #@arq.concurrent
async def poll(self, domain): async def poll(self, domain):
async with self.pool.acquire() as conn: async with self.pool.acquire() as conn:
return await crawler.check(conn=conn, session=self.session, domain=domain) return await crawler.check(conn=conn, session=self.session, domain=domain)
@arq.cron(minute=0, hour={0, 6, 12, 18}) #@arq.cron(minute=0, hour={0, 6, 12, 18})
async def poll_all(self): async def poll_all(self):
async with self.pool.acquire() as conn: async with self.pool.acquire() as conn:
for check in await db.get_latest_check_by_domain(conn): for check in await db.get_latest_check_by_domain(conn):
...@@ -29,13 +32,21 @@ class Crawler(arq.Actor): ...@@ -29,13 +32,21 @@ class Crawler(arq.Actor):
await self.pool.wait_closed() await self.pool.wait_closed()
async def poll(ctx, domain):
class Worker(arq.BaseWorker): session: ClientSession = ctx['session']
shadows = [Crawler] pool = await db.get_pool(settings.DB_DSN)
async with pool as conn:
return await crawler.check(conn=conn, session=session, domain=domain)
async def update_all(ctx):
pool = await db.get_pool(settings.DB_DSN)
for check in await db.get_latest_check_by_domain(pool):
await poll(ctx, check["domain"])
class WorkerSettings:
cron_jobs = [
cron(update_all, minute=None)
]
max_concurrent_tasks = 20 max_concurrent_tasks = 20
shutdown_delay = 5 shutdown_delay = 5
timeout_seconds = 15 timeout_seconds = 15
def __init__(self, *args, **kwargs):
kwargs.setdefault("redis_settings", arq.RedisSettings(**settings.REDIS_CONFIG))
super().__init__(*args, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment