Skip to content
Snippets Groups Projects
Verified Commit ea0f781e authored by Georg Krause's avatar Georg Krause
Browse files

Fix all worker issues

parent 2f0b9707
No related branches found
No related tags found
No related merge requests found
Pipeline #26403 failed with stages
in 51 seconds
import aiohttp
import asyncio
import sys
import aiopg
import psycopg2
from funkwhale_network import db
from funkwhale_network import exceptions
from funkwhale_network import settings
from funkwhale_network import serializers
......@@ -37,8 +38,8 @@ async def get_nodeinfo(session, nodeinfo):
raise exceptions.NoNodeInfo()
async def check(conn, session, domain, stdout=sys.stdout):
await serializers.create_domain(conn, {"name": domain})
async def check(session, domain, stdout=sys.stdout):
await serializers.create_domain({"name": domain})
check_data = {"up": True, "domain": domain}
try:
nodeinfo = await fetch_nodeinfo(session, domain)
......@@ -49,7 +50,7 @@ async def check(conn, session, domain, stdout=sys.stdout):
check_data["up"] = False
cleaned_check = check_data
await save_check(conn, cleaned_check)
await save_check(cleaned_check)
async def crawl_all(session, *domains, stdout, max_passes):
......@@ -201,8 +202,9 @@ def clean_check(check_data, nodeinfo_data):
}
@db.dict_cursor
async def save_check(cursor, data):
async def save_check(data):
conn = await aiopg.connect(settings.DB_DSN)
cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
node_name = data.pop("node_name", None)
fields, values = [], []
for field, value in data.items():
......
......@@ -67,16 +67,7 @@ async def clear(conn):
TABLES = [("domains", create_domains_table), ("checks", create_checks_table)]
def dict_cursor(func):
async def inner(conn, *args, **kwargs):
cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return await func(cursor, *args, **kwargs)
return inner
@dict_cursor
async def get_latest_check_by_domain(cursor):
async def get_latest_check_by_domain():
sql = """
SELECT DISTINCT on (domain) domain, * FROM checks INNER JOIN domains ON checks.domain = domains.name WHERE private = %s AND domains.blocked = false ORDER BY domain, time DESC
"""
......@@ -133,8 +124,9 @@ def get_domain_query(**kwargs):
return base_query.format(where_clause=""), []
@dict_cursor
async def get_domains(cursor, **kwargs):
async def get_domains(**kwargs):
conn = aiopg.connect(settings.DB_DSN)
cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
filters = kwargs.copy()
filters.setdefault("private", False)
filters.setdefault("up", True)
......@@ -169,7 +161,8 @@ def should_keep(domain, filters):
return True
@dict_cursor
async def get_domain(cursor, name):
conn = aiopg.connect(settings.DB_DSN)
cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
await cursor.execute("SELECT * FROM domains WHERE name = %s", (name,))
return list(await cursor.fetchall())[0]
import semver
import aiopg
import psycopg2
from funkwhale_network import db
from funkwhale_network import settings
@db.dict_cursor
async def create_domain(cursor, data):
async def create_domain(data):
conn = await aiopg.connect(settings.DB_DSN)
cursor = await conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
sql = "INSERT INTO domains (name) VALUES (%s) ON CONFLICT DO NOTHING RETURNING *"
await cursor.execute(sql, [data["name"]])
domain = await cursor.fetchone()
......
......@@ -7,17 +7,18 @@ from funkwhale_network import settings
from aiohttp import ClientSession
from arq import cron
import sys
async def poll(ctx, domain):
sys.stdout.write("poll")
session: ClientSession = ctx["session"]
pool = await db.get_pool(settings.DB_DSN)
async with pool as conn:
return await crawler.check(conn=conn, session=session, domain=domain)
return await crawler.check(session=session, domain=domain)
async def update_all(ctx):
pool = await db.get_pool(settings.DB_DSN)
for check in await db.get_latest_check_by_domain(pool):
sys.stdout.write("update all")
for check in await db.get_latest_check_by_domain():
await poll(ctx, check["domain"])
async def startup(ctx):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment