diff --git a/funkwhale_network/crawler.py b/funkwhale_network/crawler.py index 301613cdc2c7b37e8aee8dab9bfadb68d92fbb6f..c4ed76d642be6226b5305e8825d71ad9e0c92b79 100644 --- a/funkwhale_network/crawler.py +++ b/funkwhale_network/crawler.py @@ -1,8 +1,9 @@ import aiohttp import asyncio import sys +import aiopg +import psycopg2 -from funkwhale_network import db from funkwhale_network import exceptions from funkwhale_network import settings from funkwhale_network import serializers @@ -37,8 +38,8 @@ async def get_nodeinfo(session, nodeinfo): raise exceptions.NoNodeInfo() -async def check(conn, session, domain, stdout=sys.stdout): - await serializers.create_domain(conn, {"name": domain}) +async def check(session, domain, stdout=sys.stdout): + await serializers.create_domain({"name": domain}) check_data = {"up": True, "domain": domain} try: nodeinfo = await fetch_nodeinfo(session, domain) @@ -49,7 +50,7 @@ async def check(conn, session, domain, stdout=sys.stdout): check_data["up"] = False cleaned_check = check_data - await save_check(conn, cleaned_check) + await save_check(cleaned_check) async def crawl_all(session, *domains, stdout, max_passes): @@ -201,8 +202,9 @@ def clean_check(check_data, nodeinfo_data): } -@db.dict_cursor -async def save_check(cursor, data): +async def save_check(data): + conn = await aiopg.connect(settings.DB_DSN) + cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) node_name = data.pop("node_name", None) fields, values = [], [] for field, value in data.items(): diff --git a/funkwhale_network/db.py b/funkwhale_network/db.py index 67ea145699006c42c9e4b66c23793033efed2d47..674c3d1ad22b565c561431a678b234d1c3d49897 100644 --- a/funkwhale_network/db.py +++ b/funkwhale_network/db.py @@ -67,16 +67,7 @@ async def clear(conn): TABLES = [("domains", create_domains_table), ("checks", create_checks_table)] -def dict_cursor(func): - async def inner(conn, *args, **kwargs): - cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) - return await func(cursor, *args, **kwargs) - - return inner - - -@dict_cursor -async def get_latest_check_by_domain(cursor): +async def get_latest_check_by_domain(): sql = """ SELECT DISTINCT on (domain) domain, * FROM checks INNER JOIN domains ON checks.domain = domains.name WHERE private = %s AND domains.blocked = false ORDER BY domain, time DESC """ @@ -133,8 +124,9 @@ def get_domain_query(**kwargs): return base_query.format(where_clause=""), [] -@dict_cursor -async def get_domains(cursor, **kwargs): +async def get_domains(**kwargs): + conn = aiopg.connect(settings.DB_DSN) + cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) filters = kwargs.copy() filters.setdefault("private", False) filters.setdefault("up", True) @@ -169,7 +161,8 @@ def should_keep(domain, filters): return True -@dict_cursor async def get_domain(cursor, name): + conn = aiopg.connect(settings.DB_DSN) + cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) await cursor.execute("SELECT * FROM domains WHERE name = %s", (name,)) return list(await cursor.fetchall())[0] diff --git a/funkwhale_network/serializers.py b/funkwhale_network/serializers.py index ac3d46367161c95f39547e586c6ac9a229b91928..a6e75db07184b7cc316ca09cb6ad563d5e278032 100644 --- a/funkwhale_network/serializers.py +++ b/funkwhale_network/serializers.py @@ -1,10 +1,13 @@ import semver +import aiopg +import psycopg2 -from funkwhale_network import db +from funkwhale_network import settings -@db.dict_cursor -async def create_domain(cursor, data): +async def create_domain(data): + conn = await aiopg.connect(settings.DB_DSN) + cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) sql = "INSERT INTO domains (name) VALUES (%s) ON CONFLICT DO NOTHING RETURNING *" await cursor.execute(sql, [data["name"]]) domain = await cursor.fetchone() diff --git a/funkwhale_network/worker.py b/funkwhale_network/worker.py index f1408eedc5259dd3964ac4f3b5e12ea08d311739..5430bfc60c8850f5425f43f9b2f4c5e4b9ade235 100644 --- a/funkwhale_network/worker.py +++ b/funkwhale_network/worker.py @@ -7,17 +7,18 @@ from funkwhale_network import settings from aiohttp import ClientSession from arq import cron +import sys + async def poll(ctx, domain): + sys.stdout.write("poll") session: ClientSession = ctx["session"] - pool = await db.get_pool(settings.DB_DSN) - async with pool as conn: - return await crawler.check(conn=conn, session=session, domain=domain) + return await crawler.check(session=session, domain=domain) async def update_all(ctx): - pool = await db.get_pool(settings.DB_DSN) - for check in await db.get_latest_check_by_domain(pool): + sys.stdout.write("update all") + for check in await db.get_latest_check_by_domain(): await poll(ctx, check["domain"]) async def startup(ctx):