Newer
Older
import psycopg2
async def get_pool(db_dsn):
return await aiopg.create_pool(db_dsn)
async def create_domains_table(cursor):
await cursor.execute(
"""CREATE TABLE domains (
name VARCHAR(255) PRIMARY KEY,
node_name VARCHAR(255) NULL,
blocked BOOLEAN DEFAULT false,
first_seen TIMESTAMP WITH TIME ZONE DEFAULT NOW()
async def create_checks_table(cursor):
sql = """
CREATE TABLE checks (
time TIMESTAMPTZ NOT NULL,
domain VARCHAR(255) REFERENCES domains(name),
up BOOLEAN NOT NULL,
open_registrations BOOLEAN NULL,
private BOOLEAN NULL,
federation_enabled BOOLEAN NULL,
anonymous_can_listen BOOLEAN NULL,
usage_users_total INTEGER NULL,
usage_users_active_half_year INTEGER NULL,
usage_users_active_month INTEGER NULL,
usage_listenings_total INTEGER NULL,
library_tracks_total INTEGER NULL,
library_albums_total INTEGER NULL,
library_artists_total INTEGER NULL,
library_music_hours INTEGER NULL,
software_name VARCHAR(255) NULL,
software_version_major SMALLINT NULL,
software_version_minor SMALLINT NULL,
software_version_patch SMALLINT NULL,
software_prerelease VARCHAR(255) NULL,
software_build VARCHAR(255) NULL
);
SELECT create_hypertable('checks', 'time');
"""
await cursor.execute(sql)
async def create(conn):
async with conn.cursor() as cursor:
for table, create_handler in TABLES:
await create_handler(cursor)
async def clear(conn):
async with conn.cursor() as cursor:
for table, _ in TABLES:
await cursor.execute("DROP TABLE IF EXISTS {} CASCADE".format(table))
TABLES = [("domains", create_domains_table), ("checks", create_checks_table)]
def dict_cursor(func):
async def inner(conn, *args, **kwargs):
async with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
return await func(cursor, *args, **kwargs)
return inner
@dict_cursor
async def get_latest_check_by_domain(cursor):
sql = """
SELECT DISTINCT on (domain) domain, * FROM checks WHERE private = %s ORDER BY domain, time DESC
return list(await cursor.fetchall())
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def increment_stat(data, key, value):
if not value:
return
data[key] += value
async def get_stats(conn):
checks = await get_latest_check_by_domain(conn)
data = {
"users": {"total": 0, "activeMonth": 0, "activeHalfyear": 0},
"instances": {"total": 0, "anonymousCanListen": 0, "openRegistrations": 0},
"artists": {"total": 0},
"albums": {"total": 0},
"tracks": {"total": 0},
"listenings": {"total": 0},
}
for check in checks:
increment_stat(data["users"], "total", check["usage_users_total"])
increment_stat(data["users"], "activeMonth", check["usage_users_active_month"])
increment_stat(
data["users"], "activeHalfyear", check["usage_users_active_half_year"]
)
increment_stat(data["instances"], "total", 1)
increment_stat(
data["instances"], "openRegistrations", int(check["open_registrations"])
)
increment_stat(
data["instances"], "anonymousCanListen", int(check["anonymous_can_listen"])
)
increment_stat(data["artists"], "total", int(check["library_artists_total"]))
increment_stat(data["tracks"], "total", int(check["library_tracks_total"]))
increment_stat(data["albums"], "total", int(check["library_albums_total"]))
increment_stat(
data["listenings"], "total", int(check["usage_listenings_total"])
)
return data
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_domain_query(**kwargs):
supported_fields = [
"up",
"open_registrations",
"federation_enabled",
"anonymous_can_listen",
"private",
]
base_query = "SELECT DISTINCT on (domain) domain, * FROM checks{where_clause} ORDER BY domain, time DESC"
filters = [(key, value) for key, value in kwargs.items() if key in supported_fields]
if not filters:
return base_query.format(where_clause=""), []
params = []
where_clauses = []
for field, value in sorted(filters):
where_clauses.append(f"{field} = %s")
params.append(value)
where_clause = " WHERE {}".format(" AND ".join(where_clauses))
return base_query.format(where_clause=where_clause), params
@dict_cursor
async def get_domains(cursor, **kwargs):
query, params = get_domain_query(**kwargs)
await cursor.execute(query, params)
return list(await cursor.fetchall())