Skip to content
Snippets Groups Projects
scrobbler.py 5.48 KiB
Newer Older
Agate's avatar
Agate committed
import hashlib
import time


# https://github.com/jlieth/legacy-scrobbler
from .funkwhale_startup import PLUGIN


class ScrobblerException(Exception):
    pass


def handshake_v1(session, url, username, password):
    timestamp = str(int(time.time())).encode("utf-8")
    password_hash = hashlib.md5(password.encode("utf-8")).hexdigest()
    auth = hashlib.md5(password_hash.encode("utf-8") + timestamp).hexdigest()
    params = {
        "hs": "true",
        "p": "1.2",
        "c": PLUGIN["name"],
        "v": PLUGIN["version"],
        "u": username,
        "t": timestamp,
        "a": auth,
    }

    PLUGIN["logger"].debug(
        "Performing scrobbler handshake for username %s at %s", username, url
    )
    handshake_response = session.get(url, params=params)
    # process response
    result = handshake_response.text.split("\n")
    if len(result) >= 4 and result[0] == "OK":
        session_key = result[1]
        nowplaying_url = result[2]
        scrobble_url = result[3]
    elif result[0] == "BANNED":
        raise ScrobblerException("BANNED")
    elif result[0] == "BADAUTH":
        raise ScrobblerException("BADAUTH")
    elif result[0] == "BADTIME":
        raise ScrobblerException("BADTIME")
    else:
        raise ScrobblerException(handshake_response.text)

    PLUGIN["logger"].debug("Handshake successful, scrobble url: %s", scrobble_url)
    return session_key, nowplaying_url, scrobble_url


def submit_scrobble_v1(session, scrobble_time, track, session_key, scrobble_url):
    payload = get_scrobble_payload(track, scrobble_time)
    PLUGIN["logger"].debug("Sending scrobble with payload %s", payload)
    payload["s"] = session_key
    response = session.post(scrobble_url, payload)
    response.raise_for_status()
    if response.text.startswith("OK"):
        return
    elif response.text.startswith("BADSESSION"):
        raise ScrobblerException("Remote server says the session is invalid")
    else:
        raise ScrobblerException(response.text)

    PLUGIN["logger"].debug("Scrobble successfull!")


def submit_now_playing_v1(session, track, session_key, now_playing_url):
    payload = get_scrobble_payload(track, date=None, suffix="")
    PLUGIN["logger"].debug("Sending now playing with payload %s", payload)
    payload["s"] = session_key
    response = session.post(now_playing_url, payload)
    response.raise_for_status()
    if response.text.startswith("OK"):
        return
    elif response.text.startswith("BADSESSION"):
        raise ScrobblerException("Remote server says the session is invalid")
    else:
        raise ScrobblerException(response.text)

    PLUGIN["logger"].debug("Now playing successfull!")


def get_scrobble_payload(track, date, suffix="[0]"):
    """
    Documentation available at https://web.archive.org/web/20190531021725/https://www.last.fm/api/submissions
    """
    upload = track.uploads.filter(duration__gte=0).first()
    data = {
        "a{}".format(suffix): track.artist.name,
        "t{}".format(suffix): track.title,
        "l{}".format(suffix): upload.duration if upload else 0,
        "b{}".format(suffix): (track.album.title if track.album else "") or "",
        "n{}".format(suffix): track.position or "",
        "m{}".format(suffix): str(track.mbid) or "",
        "o{}".format(suffix): "P",  # Source: P = chosen by user
    }
    if date:
        data["i{}".format(suffix)] = int(date.timestamp())
    return data


def get_scrobble2_payload(track, date, suffix="[0]"):
    """
    Documentation available at https://web.archive.org/web/20190531021725/https://www.last.fm/api/submissions
    """
    upload = track.uploads.filter(duration__gte=0).first()
    data = {
        "artist{}".format(suffix): track.artist.name,
        "track{}".format(suffix): track.title,
        "duration{}".format(suffix): upload.duration if upload else 0,
        "album{}".format(suffix): (track.album.title if track.album else "") or "",
        "trackNumber{}".format(suffix): track.position or "",
        "mbid{}".format(suffix): str(track.mbid) or "",
        "chosenByUser{}".format(suffix): "P",  # Source: P = chosen by user
    }
    if date:
        offset = upload.duration / 2 if upload.duration else 0
        data["timestamp{}".format(suffix)] = int(date.timestamp()) - offset
    return data


def handshake_v2(username, password, session, api_key, api_secret, scrobble_url):
    params = {
        "method": "auth.getMobileSession",
        "username": username,
        "password": password,
        "api_key": api_key,
    }
    params["api_sig"] = hash_request(params, api_secret)
    response = session.post(scrobble_url, params)
    if 'status="ok"' not in response.text:
        raise ScrobblerException(response.text)

    session_key = response.text.split("<key>")[1].split("</key>")[0]
    return session_key


def submit_scrobble_v2(
    session, track, scrobble_time, session_key, scrobble_url, api_key, api_secret,
):
    params = {
        "method": "track.scrobble",
        "api_key": api_key,
        "sk": session_key,
    }
    params.update(get_scrobble2_payload(track, scrobble_time))
    params["api_sig"] = hash_request(params, api_secret)
    response = session.post(scrobble_url, params)
    if 'status="ok"' not in response.text:
        raise ScrobblerException(response.text)


def hash_request(data, secret_key):
    string = ""
    items = data.keys()
    items = sorted(items)
    for i in items:
        string += str(i)
        string += str(data[i])
    string += secret_key
    string_to_hash = string.encode("utf8")
    return hashlib.md5(string_to_hash).hexdigest()