Skip to content
Snippets Groups Projects
db.py 4.78 KiB
Newer Older
  • Learn to ignore specific revisions
  • import aiopg
    
    
    
    async def get_pool(db_dsn):
        return await aiopg.create_pool(db_dsn)
    
    
    
    async def create_domains_table(cursor):
    
        await cursor.execute(
            """CREATE TABLE domains (
    
                name VARCHAR(255) PRIMARY KEY,
                node_name VARCHAR(255) NULL,
                blocked    BOOLEAN              DEFAULT false,
    
                first_seen    TIMESTAMP WITH TIME ZONE DEFAULT NOW()
    
    async def create_checks_table(cursor):
        sql = """
        CREATE TABLE checks (
            time        TIMESTAMPTZ       NOT NULL,
            domain      VARCHAR(255) REFERENCES domains(name),
            up    BOOLEAN              NOT NULL,
            open_registrations    BOOLEAN              NULL,
            private    BOOLEAN              NULL,
            federation_enabled BOOLEAN              NULL,
            anonymous_can_listen BOOLEAN              NULL,
            usage_users_total INTEGER NULL,
            usage_users_active_half_year INTEGER NULL,
            usage_users_active_month INTEGER NULL,
            usage_listenings_total INTEGER NULL,
            library_tracks_total INTEGER NULL,
            library_albums_total INTEGER NULL,
            library_artists_total INTEGER NULL,
            library_music_hours INTEGER NULL,
            software_name VARCHAR(255) NULL,
            software_version_major SMALLINT NULL,
            software_version_minor SMALLINT NULL,
            software_version_patch SMALLINT NULL,
            software_prerelease VARCHAR(255) NULL,
            software_build VARCHAR(255) NULL
        );
        SELECT create_hypertable('checks', 'time');
        """
        await cursor.execute(sql)
    
    
    
    async def create(conn):
        async with conn.cursor() as cursor:
            for table, create_handler in TABLES:
                await create_handler(cursor)
    
    
    async def clear(conn):
        async with conn.cursor() as cursor:
            for table, _ in TABLES:
    
                await cursor.execute("DROP TABLE IF EXISTS {} CASCADE".format(table))
    
    TABLES = [("domains", create_domains_table), ("checks", create_checks_table)]
    
    
    
    def dict_cursor(func):
        async def inner(conn, *args, **kwargs):
            async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
                return await func(cursor, *args, **kwargs)
    
        return inner
    
    
    @dict_cursor
    async def get_latest_check_by_domain(cursor):
        sql = """
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        SELECT DISTINCT on (domain) domain, * FROM checks WHERE private = %s ORDER BY domain, time DESC
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        await cursor.execute(sql, [False])
    
        return list(await cursor.fetchall())
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    
    
    def increment_stat(data, key, value):
        if not value:
            return
        data[key] += value
    
    
    async def get_stats(conn):
        checks = await get_latest_check_by_domain(conn)
        data = {
            "users": {"total": 0, "activeMonth": 0, "activeHalfyear": 0},
            "instances": {"total": 0, "anonymousCanListen": 0, "openRegistrations": 0},
            "artists": {"total": 0},
            "albums": {"total": 0},
            "tracks": {"total": 0},
            "listenings": {"total": 0},
        }
        for check in checks:
            increment_stat(data["users"], "total", check["usage_users_total"])
            increment_stat(data["users"], "activeMonth", check["usage_users_active_month"])
            increment_stat(
                data["users"], "activeHalfyear", check["usage_users_active_half_year"]
            )
            increment_stat(data["instances"], "total", 1)
            increment_stat(
                data["instances"], "openRegistrations", int(check["open_registrations"])
            )
            increment_stat(
                data["instances"], "anonymousCanListen", int(check["anonymous_can_listen"])
            )
            increment_stat(data["artists"], "total", int(check["library_artists_total"]))
            increment_stat(data["tracks"], "total", int(check["library_tracks_total"]))
            increment_stat(data["albums"], "total", int(check["library_albums_total"]))
            increment_stat(
                data["listenings"], "total", int(check["usage_listenings_total"])
            )
        return data
    
    
    
    def get_domain_query(**kwargs):
        supported_fields = [
            "up",
            "open_registrations",
            "federation_enabled",
            "anonymous_can_listen",
            "private",
        ]
        base_query = "SELECT DISTINCT on (domain) domain, * FROM checks{where_clause} ORDER BY domain, time DESC"
        filters = [(key, value) for key, value in kwargs.items() if key in supported_fields]
        if not filters:
            return base_query.format(where_clause=""), []
    
        params = []
        where_clauses = []
        for field, value in sorted(filters):
            where_clauses.append(f"{field} = %s")
            params.append(value)
    
        where_clause = " WHERE {}".format(" AND ".join(where_clauses))
        return base_query.format(where_clause=where_clause), params
    
    
    @dict_cursor
    async def get_domains(cursor, **kwargs):
        query, params = get_domain_query(**kwargs)
        await cursor.execute(query, params)
        return list(await cursor.fetchall())