Skip to content
Snippets Groups Projects
crawler.py 7.36 KiB
Newer Older
  • Learn to ignore specific revisions
  • Eliot Berriot's avatar
    Eliot Berriot committed
    import asyncio
    
    from . import db
    from . import exceptions
    from . import settings
    from . import serializers
    
    from . import schemas
    
    
    
    def get_session_kwargs():
        headers = {"User-Agent": settings.CRAWLER_USER_AGENT}
        return {
            "timeout": aiohttp.ClientTimeout(total=settings.CRAWLER_TIMEOUT),
            "headers": headers,
        }
    
    
    async def fetch_nodeinfo(session, domain):
    
        nodeinfo = await get_well_known_data(session, domain)
        data = await get_nodeinfo(session, nodeinfo)
        return data
    
    
    async def get_well_known_data(session, domain, protocol="https"):
        url = f"https://{domain}/.well-known/nodeinfo"
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        response = await session.get(url, ssl=False)
    
        return await response.json()
    
    
    async def get_nodeinfo(session, nodeinfo):
        for link in nodeinfo.get("links", []):
            if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
    
    Eliot Berriot's avatar
    Eliot Berriot committed
                response = await session.get(link["href"], ssl=False)
    
                return await response.json()
    
        raise exceptions.NoNodeInfo()
    
    
    async def check(conn, session, domain, stdout=sys.stdout):
        await serializers.create_domain(conn, {"name": domain})
        check_data = {"up": True, "domain": domain}
        try:
            nodeinfo = await fetch_nodeinfo(session, domain)
            cleaned_nodeinfo = clean_nodeinfo(nodeinfo)
            cleaned_check = clean_check(check_data, cleaned_nodeinfo)
        except (aiohttp.client_exceptions.ClientError, exceptions.CrawlerError) as e:
            stdout.write(f"Error while fetching {domain}: {e}, marking instance as down")
            check_data["up"] = False
            cleaned_check = check_data
    
        await save_check(conn, cleaned_check)
    
    Eliot Berriot's avatar
    Eliot Berriot committed
    async def crawl_all(session, *domains, stdout, max_passes):
        data = {
            "pending_domains": set(domains),
            "valid_domains": set(),
            "invalid_domains": set(),
            "handled_domains": set(),
            "results": {},
            "pass_number": 0,
        }
    
        def print_pass():
            stdout(
                "[Pass {pass_number}] {pending_domains} new domains to crawl, {handled_domains} checked, {valid_domains} valid".format(
                    pass_number=data["pass_number"],
                    pending_domains=len(data["pending_domains"]),
                    handled_domains=len(data["handled_domains"]),
                    valid_domains=len(data["valid_domains"]),
                )
            )
    
        while data["pending_domains"] and data["pass_number"] < max_passes:
            data["pass_number"] += 1
            print_pass()
            tasks = [
                crawl_single(session, domain, data) for domain in data["pending_domains"]
            ]
            await asyncio.wait(tasks)
        if data["pass_number"] < max_passes:
            print_pass()
    
        return data
    
    
    async def crawl_single(session, domain, data):
        try:
            nodeinfo_data = await fetch_nodeinfo(session, domain)
            cleaned_data = clean_check(
                {"domain": domain, "up": True}, clean_nodeinfo(nodeinfo_data)
            )
        except Exception as e:
            data["invalid_domains"].add(domain)
            return
        finally:
            data["pending_domains"].remove(domain)
            data["handled_domains"].add(domain)
    
        nodes_url = recursive_getattr(nodeinfo_data, "metadata.knownNodesListUrl")
        if nodes_url:
            try:
                await gather_known_nodes(session, nodes_url, data)
            except:
                pass
    
        data["valid_domains"].add(domain)
        data["results"][domain] = cleaned_data
    
    
    async def gather_known_nodes(session, url, data):
        fetch_url = url
        while fetch_url:
            response = await session.get(fetch_url, ssl=False)
            result = await response.json()
            fetch_url = result.get("next")
            known_domains = set([d["name"] for d in result["results"]])
            data["pending_domains"] |= known_domains - data["handled_domains"]
    
    
    
    def clean_nodeinfo(data):
        schema = schemas.NodeInfo2Schema()
    
    Eliot Berriot's avatar
    Eliot Berriot committed
        return result
    
    
    
    def recursive_getattr(obj, key, permissive=True):
        """
        Given a dictionary such as {'user': {'name': 'Bob'}} and
        a dotted string such as user.name, returns 'Bob'.
    
        If the value is not present, returns None
        """
        v = obj
        for k in key.split("."):
            try:
                v = v.get(k)
            except (TypeError, AttributeError):
                if not permissive:
                    raise
                return
            if v is None:
                return
    
        return v
    
    
    def clean_check(check_data, nodeinfo_data):
        return {
            "domain": check_data["domain"],
            "up": check_data["up"],
    
            "node_name": recursive_getattr(nodeinfo_data, "metadata.nodeName"),
    
            "open_registrations": recursive_getattr(nodeinfo_data, "openRegistrations"),
            "federation_enabled": recursive_getattr(
                nodeinfo_data, "metadata.library.federationEnabled"
            ),
            "anonymous_can_listen": recursive_getattr(
                nodeinfo_data, "metadata.library.anonymousCanListen"
            ),
            "private": recursive_getattr(nodeinfo_data, "metadata.private"),
            "usage_users_total": recursive_getattr(nodeinfo_data, "usage.users.total"),
            "usage_users_active_half_year": recursive_getattr(
                nodeinfo_data, "usage.users.activeHalfyear"
            ),
            "usage_users_active_month": recursive_getattr(
                nodeinfo_data, "usage.users.activeMonth"
            ),
            "usage_listenings_total": recursive_getattr(
                nodeinfo_data, "metadata.usage.listenings.total"
            ),
    
            "usage_downloads_total": recursive_getattr(
                nodeinfo_data, "metadata.usage.downloads.total"
            ),
    
            "library_tracks_total": recursive_getattr(
                nodeinfo_data, "metadata.library.tracks.total"
            ),
            "library_albums_total": recursive_getattr(
                nodeinfo_data, "metadata.library.albums.total"
            ),
            "library_artists_total": recursive_getattr(
                nodeinfo_data, "metadata.library.artists.total"
            ),
            "library_music_hours": recursive_getattr(
                nodeinfo_data, "metadata.library.music.hours"
            ),
            "software_name": recursive_getattr(nodeinfo_data, "software.name"),
            "software_version_major": recursive_getattr(
                nodeinfo_data, "software.version.major"
            ),
            "software_version_minor": recursive_getattr(
                nodeinfo_data, "software.version.minor"
            ),
            "software_version_patch": recursive_getattr(
                nodeinfo_data, "software.version.patch"
            ),
            "software_prerelease": recursive_getattr(
                nodeinfo_data, "software.version.prerelease"
            ),
            "software_build": recursive_getattr(nodeinfo_data, "software.version.build"),
        }
    
    
    
    @db.dict_cursor
    async def save_check(cursor, data):
        node_name = data.pop("node_name", None)
    
        fields, values = [], []
        for field, value in data.items():
            fields.append(field)
            values.append(value)
    
        sql = "INSERT INTO checks (time, {}) VALUES (NOW(), {}) RETURNING *".format(
            ", ".join(fields), ", ".join(["%s" for _ in values])
        )
    
        await cursor.execute(sql, values)
        check = await cursor.fetchone()
    
    
        if data.get("private") is True:
            # let's clean previous checks
            sql = "DELETE FROM checks WHERE domain = %s"
            await cursor.execute(sql, [data["domain"]])
            return
    
        if node_name:
            await cursor.execute(
                "UPDATE domains SET node_name = %s WHERE name = %s",
                [node_name, data["domain"]],
            )
    
        return check