Merge branch 'develop'

This commit is contained in:
Dniel97
2025-12-15 00:15:58 +01:00
2 changed files with 912 additions and 568 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -19,20 +19,20 @@ from datetime import datetime, timedelta
from utils.utils import create_requests_session from utils.utils import create_requests_session
technical_names = { technical_names = {
'eac3': 'E-AC-3 JOC (Dolby Digital Plus with Dolby Atmos, with 5.1 bed)', "eac3": "E-AC-3 JOC (Dolby Digital Plus with Dolby Atmos, with 5.1 bed)",
'mha1': 'MPEG-H 3D Audio (Sony 360 Reality Audio)', "mha1": "MPEG-H 3D Audio (Sony 360 Reality Audio)",
'ac4': 'AC-4 IMS (Dolby AC-4 with Dolby Atmos immersive stereo)', "ac4": "AC-4 IMS (Dolby AC-4 with Dolby Atmos immersive stereo)",
'mqa': 'MQA (Master Quality Authenticated) in FLAC container', "mqa": "MQA (Master Quality Authenticated) in FLAC container",
'flac': 'FLAC (Free Lossless Audio Codec)', "flac": "FLAC (Free Lossless Audio Codec)",
'alac': 'ALAC (Apple Lossless Audio Codec)', "alac": "ALAC (Apple Lossless Audio Codec)",
'mp4a.40.2': 'AAC 320 (Advanced Audio Coding) with a bitrate of 320kb/s', "mp4a.40.2": "AAC 320 (Advanced Audio Coding) with a bitrate of 320kb/s",
'mp4a.40.5': 'AAC 96 (Advanced Audio Coding) with a bitrate of 96kb/s' "mp4a.40.5": "AAC 96 (Advanced Audio Coding) with a bitrate of 96kb/s",
} }
class TidalRequestError(Exception): class TidalRequestError(Exception):
def __init__(self, payload): def __init__(self, payload):
sf = '{subStatus}: {userMessage} (HTTP {status})'.format(**payload) sf = "{subStatus}: {userMessage} (HTTP {status})".format(**payload)
self.payload = payload self.payload = payload
super(TidalRequestError, self).__init__(sf) super(TidalRequestError, self).__init__(sf)
@@ -55,13 +55,15 @@ class SessionType(Enum):
class TidalApi(object): class TidalApi(object):
TIDAL_API_BASE = 'https://api.tidal.com/v1/' TIDAL_API_BASE = "https://api.tidal.com/v1/"
TIDAL_VIDEO_BASE = 'https://api.tidalhifi.com/v1/' TIDAL_VIDEO_BASE = "https://api.tidalhifi.com/v1/"
TIDAL_CLIENT_VERSION = '2.26.1' TIDAL_CLIENT_VERSION = "2.26.1"
def __init__(self, sessions: dict): def __init__(self, sessions: dict):
self.sessions = sessions self.sessions = sessions
self.default: SessionType = SessionType.TV # Change to TV or MOBILE depending on AC-4/360RA self.default: SessionType = (
SessionType.TV
) # Change to TV or MOBILE depending on AC-4/360RA
self.s = create_requests_session() self.s = create_requests_session()
@@ -69,14 +71,15 @@ class TidalApi(object):
if params is None: if params is None:
params = {} params = {}
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
params['countryCode'] = self.sessions[self.default.name].country_code params["countryCode"] = self.sessions[self.default.name].country_code
if 'limit' not in params: if "limit" not in params:
params['limit'] = '9999' params["limit"] = "9999"
resp = self.s.get( resp = self.s.get(
self.TIDAL_API_BASE + url, self.TIDAL_API_BASE + url,
headers=self.sessions[self.default.name].auth_headers(), headers=self.sessions[self.default.name].auth_headers(),
params=params) params=params,
)
# if the request 401s or 403s, try refreshing the TV/Mobile session in case that helps # if the request 401s or 403s, try refreshing the TV/Mobile session in case that helps
if not refresh and (resp.status_code == 401 or resp.status_code == 403): if not refresh and (resp.status_code == 401 or resp.status_code == 403):
@@ -93,150 +96,171 @@ class TidalApi(object):
pass pass
if not resp_json: if not resp_json:
raise TidalError('Response was not valid JSON. HTTP status {}. {}'.format(resp.status_code, resp.text)) raise TidalError(
"Response was not valid JSON. HTTP status {}. {}".format(
resp.status_code, resp.text
)
)
if 'status' in resp_json and resp_json['status'] == 404 and \ if (
'subStatus' in resp_json and resp_json['subStatus'] == 2001: "status" in resp_json
raise TidalError('Error: {}. This might be region-locked.'.format(resp_json['userMessage'])) and resp_json["status"] == 404
and "subStatus" in resp_json
and resp_json["subStatus"] == 2001
):
raise TidalError(
"Error: {}. This might be region-locked.".format(
resp_json["userMessage"]
)
)
# Really hacky way, pls don't copy this ever # Really hacky way, pls don't copy this ever
if 'status' in resp_json and resp_json['status'] == 404 and \ if (
'error' in resp_json and resp_json['error'] == 'Not Found': "status" in resp_json
and resp_json["status"] == 404
and "error" in resp_json
and resp_json["error"] == "Not Found"
):
return resp_json return resp_json
if 'status' in resp_json and not resp_json['status'] == 200: if "status" in resp_json and not resp_json["status"] == 200:
raise TidalRequestError(resp_json) raise TidalRequestError(resp_json)
return resp_json return resp_json
def get_stream_url(self, track_id, quality): def get_stream_url(self, track_id, quality):
return self._get('tracks/' + str(track_id) + '/playbackinfopostpaywall/v4', { return self._get(
'playbackmode': 'STREAM', "tracks/" + str(track_id) + "/playbackinfopostpaywall/v4",
'assetpresentation': 'FULL', {
'audioquality': quality, "playbackmode": "STREAM",
'prefetch': 'false' "assetpresentation": "FULL",
}) "audioquality": quality,
"prefetch": "false",
},
)
def get_search_data(self, search_term, limit=20): def get_search_data(self, search_term, limit=20):
return self._get('search', params={ return self._get(
'query': str(search_term), "search",
'offset': 0, params={
'limit': limit, "query": str(search_term),
'includeContributors': 'true' "offset": 0,
}) "limit": limit,
"includeContributors": "true",
},
)
def get_page(self, pageurl, params=None): def get_page(self, pageurl, params=None):
local_params = { local_params = {
'deviceType': 'TV', "deviceType": "TV",
'locale': 'en_US', "locale": "en_US",
'mediaFormats': 'SONY_360' "mediaFormats": "SONY_360",
} }
if params: if params:
local_params.update(params) local_params.update(params)
return self._get('pages/' + pageurl, params=local_params) return self._get("pages/" + pageurl, params=local_params)
def get_playlist_items(self, playlist_id): def get_playlist_items(self, playlist_id):
result = self._get('playlists/' + playlist_id + '/items', { result = self._get(
'offset': 0, "playlists/" + playlist_id + "/items", {"offset": 0, "limit": 100}
'limit': 100 )
})
if result['totalNumberOfItems'] <= 100: if result["totalNumberOfItems"] <= 100:
return result return result
offset = len(result['items']) offset = len(result["items"])
while True: while True:
buf = self._get('playlists/' + playlist_id + '/items', { buf = self._get(
'offset': offset, "playlists/" + playlist_id + "/items", {"offset": offset, "limit": 100}
'limit': 100 )
}) offset += len(buf["items"])
offset += len(buf['items']) result["items"] += buf["items"]
result['items'] += buf['items']
if offset >= result['totalNumberOfItems']: if offset >= result["totalNumberOfItems"]:
break break
return result return result
def get_playlist(self, playlist_id): def get_playlist(self, playlist_id):
return self._get('playlists/' + str(playlist_id)) return self._get("playlists/" + str(playlist_id))
def get_album_tracks(self, album_id): def get_album_tracks(self, album_id):
return self._get('albums/' + str(album_id) + '/tracks') return self._get("albums/" + str(album_id) + "/tracks")
def get_track(self, track_id): def get_track(self, track_id):
return self._get('tracks/' + str(track_id)) return self._get("tracks/" + str(track_id))
def get_album(self, album_id): def get_album(self, album_id):
return self._get('albums/' + str(album_id)) return self._get("albums/" + str(album_id))
def get_video(self, video_id): def get_video(self, video_id):
return self._get('videos/' + str(video_id)) return self._get("videos/" + str(video_id))
def get_tracks_by_isrc(self, isrc): def get_tracks_by_isrc(self, isrc):
return self._get('tracks', params={ return self._get("tracks", params={"isrc": isrc})
'isrc': isrc
})
def get_favorite_tracks(self, user_id): def get_favorite_tracks(self, user_id):
return self._get('users/' + str(user_id) + '/favorites/tracks') return self._get("users/" + str(user_id) + "/favorites/tracks")
def get_track_contributors(self, track_id): def get_track_contributors(self, track_id):
return self._get('tracks/' + str(track_id) + '/contributors') return self._get("tracks/" + str(track_id) + "/contributors")
def get_album_contributors(self, album_id, offset: int = 0, limit: int = 100): def get_album_contributors(self, album_id, offset: int = 0, limit: int = 100):
return self._get('albums/' + album_id + '/items/credits', params={ return self._get(
'replace': True, "albums/" + album_id + "/items/credits",
'offset': offset, params={
'limit': limit, "replace": True,
'includeContributors': True "offset": offset,
}) "limit": limit,
"includeContributors": True,
},
)
def get_lyrics(self, track_id): def get_lyrics(self, track_id):
return self._get('tracks/' + str(track_id) + '/lyrics', params={ return self._get(
'deviceType': 'TV', "tracks/" + str(track_id) + "/lyrics",
'locale': 'en_US' params={"deviceType": "TV", "locale": "en_US"},
}) )
def get_video_contributors(self, video_id): def get_video_contributors(self, video_id):
return self._get('videos/' + video_id + '/contributors', params={ return self._get("videos/" + video_id + "/contributors", params={"limit": 50})
'limit': 50
})
def get_video_stream_url(self, video_id): def get_video_stream_url(self, video_id):
return self._get('videos/' + str(video_id) + '/streamurl') return self._get("videos/" + str(video_id) + "/streamurl")
def get_artist(self, artist_id): def get_artist(self, artist_id):
return self._get('artists/' + str(artist_id)) return self._get("artists/" + str(artist_id))
def get_artist_albums(self, artist_id): def get_artist_albums(self, artist_id):
return self._get('artists/' + str(artist_id) + '/albums') return self._get("artists/" + str(artist_id) + "/albums")
def get_artist_albums_ep_singles(self, artist_id): def get_artist_albums_ep_singles(self, artist_id):
return self._get('artists/' + str(artist_id) + '/albums', params={'filter': 'EPSANDSINGLES'}) return self._get(
"artists/" + str(artist_id) + "/albums", params={"filter": "EPSANDSINGLES"}
)
def get_type_from_id(self, id_): def get_type_from_id(self, id_):
result = None result = None
try: try:
result = self.get_album(id_) result = self.get_album(id_)
return 'a' return "a"
except TidalError: except TidalError:
pass pass
try: try:
result = self.get_artist(id_) result = self.get_artist(id_)
return 'r' return "r"
except TidalError: except TidalError:
pass pass
try: try:
result = self.get_track(id_) result = self.get_track(id_)
return 't' return "t"
except TidalError: except TidalError:
pass pass
try: try:
result = self.get_video(id_) result = self.get_video(id_)
return 'v' return "v"
except TidalError: except TidalError:
pass pass
@@ -256,6 +280,7 @@ class TidalSession(ABC):
""" """
Tidal abstract session object with all (abstract) functions needed: auth_headers(), refresh(), session_type() Tidal abstract session object with all (abstract) functions needed: auth_headers(), refresh(), session_type()
""" """
def __init__(self): def __init__(self):
self.access_token = None self.access_token = None
self.refresh_token = None self.refresh_token = None
@@ -264,30 +289,32 @@ class TidalSession(ABC):
self.country_code = None self.country_code = None
def set_storage(self, storage: dict): def set_storage(self, storage: dict):
self.access_token = storage.get('access_token') self.access_token = storage.get("access_token")
self.refresh_token = storage.get('refresh_token') self.refresh_token = storage.get("refresh_token")
self.expires = storage.get('expires') self.expires = storage.get("expires")
self.user_id = storage.get('user_id') self.user_id = storage.get("user_id")
self.country_code = storage.get('country_code') self.country_code = storage.get("country_code")
def get_storage(self) -> dict: def get_storage(self) -> dict:
return { return {
'access_token': self.access_token, "access_token": self.access_token,
'refresh_token': self.refresh_token, "refresh_token": self.refresh_token,
'expires': self.expires, "expires": self.expires,
'user_id': self.user_id, "user_id": self.user_id,
'country_code': self.country_code "country_code": self.country_code,
} }
def get_subscription(self) -> str: def get_subscription(self) -> str:
if self.access_token: if self.access_token:
r = requests.get(f'https://api.tidal.com/v1/users/{self.user_id}/subscription', r = requests.get(
params={'countryCode': self.country_code}, f"https://api.tidal.com/v1/users/{self.user_id}/subscription",
headers=self.auth_headers()) params={"countryCode": self.country_code},
headers=self.auth_headers(),
)
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError(r.json()['userMessage']) raise TidalAuthError(r.json()["userMessage"])
return r.json()['subscription']['type'] return r.json()["subscription"]["type"]
@abstractmethod @abstractmethod
def auth_headers(self) -> dict: def auth_headers(self) -> dict:
@@ -301,7 +328,9 @@ class TidalSession(ABC):
if self.access_token is None or datetime.now() > self.expires: if self.access_token is None or datetime.now() > self.expires:
return False return False
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers()) r = requests.get(
"https://api.tidal.com/v1/sessions", headers=self.auth_headers()
)
return r.status_code == 200 return r.status_code == 200
@abstractmethod @abstractmethod
@@ -320,173 +349,205 @@ class TidalMobileSession(TidalSession):
def __init__(self, client_token: str): def __init__(self, client_token: str):
super().__init__() super().__init__()
self.TIDAL_LOGIN_BASE = 'https://login.tidal.com/api/' self.TIDAL_LOGIN_BASE = "https://login.tidal.com/api/"
self.TIDAL_AUTH_BASE = 'https://auth.tidal.com/v1/' self.TIDAL_AUTH_BASE = "https://auth.tidal.com/v1/"
self.client_id = client_token self.client_id = client_token
self.redirect_uri = 'https://tidal.com/android/login/auth' self.redirect_uri = "https://tidal.com/android/login/auth"
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=') self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(
self.code_challenge = base64.urlsafe_b64encode(hashlib.sha256(self.code_verifier).digest()).rstrip(b'=') b"="
)
self.code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier).digest()
).rstrip(b"=")
self.client_unique_key = secrets.token_hex(8) self.client_unique_key = secrets.token_hex(8)
self.user_agent = 'Mozilla/5.0 (Linux; Android 13; Pixel 8 Build/TQ2A.230505.002; wv) AppleWebKit/537.36 ' \ self.user_agent = (
'(KHTML, like Gecko) Version/4.0 Chrome/119.0.6045.163 Mobile Safari/537.36' "Mozilla/5.0 (Linux; Android 13; Pixel 8 Build/TQ2A.230505.002; wv) AppleWebKit/537.36 "
"(KHTML, like Gecko) Version/4.0 Chrome/119.0.6045.163 Mobile Safari/537.36"
)
def auth(self, username: str, password: str): def auth(self, username: str, password: str):
s = requests.Session() s = requests.Session()
params = {
'response_type': 'code',
'redirect_uri': self.redirect_uri,
'lang': 'en_US',
'appMode': 'android',
'client_id': self.client_id,
'client_unique_key': self.client_unique_key,
'code_challenge': self.code_challenge,
'code_challenge_method': 'S256',
'restrict_signup': 'true'
}
# retrieve csrf token for subsequent request
r = s.get('https://login.tidal.com/authorize', params=params, headers={
'user-agent': self.user_agent,
'accept-language': 'en-US',
'x-requested-with': 'com.aspiro.tidal'
})
if r.status_code == 400:
raise TidalAuthError("Authorization failed! Is the clientid/token up to date?")
elif r.status_code == 403:
raise TidalAuthError("TIDAL BOT protection, try again later!")
# try Tidal DataDome cookie request # try Tidal DataDome cookie request
r = s.post('https://dd.tidal.com/js/', data={ r = s.post(
'jsData': f'{{"opts":"endpoint,ajaxListenerPath","ua":"{self.user_agent}"}}', "https://dd.tidal.com/js/",
'ddk': '1F633CDD8EF22541BD6D9B1B8EF13A', # API Key (required) data={
'Referer': quote(r.url), # Referer authorize link (required) "jsData": f'{{"opts":"endpoint,ajaxListenerPath","ua":"{self.user_agent}"}}',
'responsePage': 'origin', # useless? "ddk": "1F633CDD8EF22541BD6D9B1B8EF13A", # API Key (required)
'ddv': '4.17.0' # useless? "Referer": "https%3A%2F%2Ftidal.com%2F", # Referer authorize link (required)
}, headers={ "responsePage": "origin", # useless?
'user-agent': self.user_agent, "ddv": "4.17.0", # useless?
'content-type': 'application/x-www-form-urlencoded' },
}) headers={
"user-agent": self.user_agent,
"content-type": "application/x-www-form-urlencoded",
},
)
if r.status_code != 200 or not r.json().get('cookie'): if r.status_code != 200 or not r.json().get("cookie"):
raise TidalAuthError("TIDAL BOT protection, could not get DataDome cookie!") raise TidalAuthError("TIDAL BOT protection, could not get DataDome cookie!")
# get the cookie from the json request and save it in the session # get the cookie from the json request and save it in the session
dd_cookie = r.json().get('cookie').split(';')[0] dd_cookie = r.json().get("cookie").split(";")[0]
s.cookies[dd_cookie.split('=')[0]] = dd_cookie.split('=')[1] s.cookies[dd_cookie.split("=")[0]] = dd_cookie.split("=")[1]
params = {
"response_type": "code",
"redirect_uri": self.redirect_uri,
"lang": "en_US",
"appMode": "android",
"client_id": self.client_id,
"client_unique_key": self.client_unique_key,
"code_challenge": self.code_challenge,
"code_challenge_method": "S256",
"restrict_signup": "true",
}
# retrieve csrf token for subsequent request
r = s.get(
"https://login.tidal.com/authorize",
params=params,
headers={
"user-agent": self.user_agent,
"accept-language": "en-US",
"x-requested-with": "com.aspiro.tidal",
},
)
if r.status_code == 400:
raise TidalAuthError(
"Authorization failed! Is the clientid/token up to date?"
)
elif r.status_code == 403:
raise TidalAuthError("TIDAL BOT protection, try again later!")
# enter email, verify email is valid # enter email, verify email is valid
r = s.post(self.TIDAL_LOGIN_BASE + 'email', params=params, json={ r = s.post(
'email': username self.TIDAL_LOGIN_BASE + "email",
}, headers={ params=params,
'user-agent': self.user_agent, json={"email": username},
'x-csrf-token': s.cookies['_csrf-token'], headers={
'accept': 'application/json, text/plain, */*', "user-agent": self.user_agent,
'content-type': 'application/json', "x-csrf-token": s.cookies["_csrf-token"],
'accept-language': 'en-US', "accept": "application/json, text/plain, */*",
'x-requested-with': 'com.aspiro.tidal' "content-type": "application/json",
}) "accept-language": "en-US",
"x-requested-with": "com.aspiro.tidal",
},
)
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError(r.text) raise TidalAuthError(r.text)
if not r.json()['isValidEmail']: if not r.json()["isValidEmail"]:
raise TidalAuthError('Invalid email') raise TidalAuthError("Invalid email")
if r.json()['newUser']: if r.json()["newUser"]:
raise TidalAuthError('User does not exist') raise TidalAuthError("User does not exist")
# login with user credentials # login with user credentials
r = s.post(self.TIDAL_LOGIN_BASE + 'email/user/existing', params=params, json={ r = s.post(
'email': username, self.TIDAL_LOGIN_BASE + "email/user/existing",
'password': password params=params,
}, headers={ json={"email": username, "password": password},
'User-Agent': self.user_agent, headers={
'x-csrf-token': s.cookies['_csrf-token'], "User-Agent": self.user_agent,
'accept': 'application/json, text/plain, */*', "x-csrf-token": s.cookies["_csrf-token"],
'content-type': 'application/json', "accept": "application/json, text/plain, */*",
'accept-language': 'en-US', "content-type": "application/json",
'x-requested-with': 'com.aspiro.tidal' "accept-language": "en-US",
}) "x-requested-with": "com.aspiro.tidal",
},
)
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError(r.text) raise TidalAuthError(r.text)
# retrieve access code # retrieve access code
r = s.get('https://login.tidal.com/success', allow_redirects=False, headers={ r = s.get(
'user-agent': self.user_agent, "https://login.tidal.com/success",
'accept-language': 'en-US', allow_redirects=False,
'x-requested-with': 'com.aspiro.tidal' headers={
}) "user-agent": self.user_agent,
"accept-language": "en-US",
"x-requested-with": "com.aspiro.tidal",
},
)
if r.status_code == 401: if r.status_code == 401:
raise TidalAuthError('Incorrect password') raise TidalAuthError("Incorrect password")
assert (r.status_code == 302) assert r.status_code == 302
url = urlparse.urlparse(r.headers['location']) url = urlparse.urlparse(r.headers["location"])
oauth_code = parse_qs(url.query)['code'][0] oauth_code = parse_qs(url.query)["code"][0]
# exchange access code for oauth token # exchange access code for oauth token
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={ r = requests.post(
'code': oauth_code, self.TIDAL_AUTH_BASE + "oauth2/token",
'client_id': self.client_id, data={
'grant_type': 'authorization_code', "code": oauth_code,
'redirect_uri': self.redirect_uri, "client_id": self.client_id,
'scope': 'r_usr w_usr w_sub', "grant_type": "authorization_code",
'code_verifier': self.code_verifier, "redirect_uri": self.redirect_uri,
'client_unique_key': self.client_unique_key "scope": "r_usr w_usr w_sub",
}, headers={ "code_verifier": self.code_verifier,
'User-Agent': self.user_agent "client_unique_key": self.client_unique_key,
}) },
headers={"User-Agent": self.user_agent},
)
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError(r.text) raise TidalAuthError(r.text)
self.access_token = r.json()['access_token'] self.access_token = r.json()["access_token"]
self.refresh_token = r.json()['refresh_token'] self.refresh_token = r.json()["refresh_token"]
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers()) r = requests.get(
"https://api.tidal.com/v1/sessions", headers=self.auth_headers()
)
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError(r.text) raise TidalAuthError(r.text)
self.user_id = r.json()['userId'] self.user_id = r.json()["userId"]
self.country_code = r.json()['countryCode'] self.country_code = r.json()["countryCode"]
def refresh(self): def refresh(self):
assert (self.refresh_token is not None) assert self.refresh_token is not None
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={ r = requests.post(
'refresh_token': self.refresh_token, self.TIDAL_AUTH_BASE + "oauth2/token",
'client_id': self.client_id, data={
'grant_type': 'refresh_token' "refresh_token": self.refresh_token,
}) "client_id": self.client_id,
"grant_type": "refresh_token",
},
)
if r.status_code == 200: if r.status_code == 200:
# print('TIDAL: Refreshing token successful') # print('TIDAL: Refreshing token successful')
self.access_token = r.json()['access_token'] self.access_token = r.json()["access_token"]
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
if 'refresh_token' in r.json(): if "refresh_token" in r.json():
self.refresh_token = r.json()['refresh_token'] self.refresh_token = r.json()["refresh_token"]
elif r.status_code == 401: elif r.status_code == 401:
print('\tERROR: ' + r.json()['userMessage']) print("\tERROR: " + r.json()["userMessage"])
return r.status_code == 200 return r.status_code == 200
@staticmethod @staticmethod
def session_type(): def session_type():
return 'Mobile' return "Mobile"
def auth_headers(self): def auth_headers(self):
return { return {
'Host': 'api.tidal.com', "Host": "api.tidal.com",
'X-Tidal-Token': self.client_id, "X-Tidal-Token": self.client_id,
'Authorization': 'Bearer {}'.format(self.access_token), "Authorization": "Bearer {}".format(self.access_token),
'Connection': 'Keep-Alive', "Connection": "Keep-Alive",
'Accept-Encoding': 'gzip', "Accept-Encoding": "gzip",
'User-Agent': 'TIDAL_ANDROID/1039 okhttp/3.14.9' "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9",
} }
@@ -497,7 +558,7 @@ class TidalTvSession(TidalSession):
def __init__(self, client_token: str, client_secret: str): def __init__(self, client_token: str, client_secret: str):
super().__init__() super().__init__()
self.TIDAL_AUTH_BASE = 'https://auth.tidal.com/v1/' self.TIDAL_AUTH_BASE = "https://auth.tidal.com/v1/"
self.client_id = client_token self.client_id = client_token
self.client_secret = client_secret self.client_secret = client_secret
@@ -512,29 +573,35 @@ class TidalTvSession(TidalSession):
s = requests.Session() s = requests.Session()
# retrieve csrf token for subsequent request # retrieve csrf token for subsequent request
r = s.post(self.TIDAL_AUTH_BASE + 'oauth2/device_authorization', data={ r = s.post(
'client_id': self.client_id, self.TIDAL_AUTH_BASE + "oauth2/device_authorization",
'scope': 'r_usr w_usr' data={"client_id": self.client_id, "scope": "r_usr w_usr"},
}) )
if r.status_code != 200: if r.status_code != 200:
raise TidalAuthError("Authorization failed! Is the clientid/token up to date?") raise TidalAuthError(
"Authorization failed! Is the clientid/token up to date?"
)
else: else:
device_code = r.json()['deviceCode'] device_code = r.json()["deviceCode"]
user_code = r.json()['userCode'] user_code = r.json()["userCode"]
print('Opening https://link.tidal.com/{}, log in or sign up to TIDAL.'.format(user_code)) print(
webbrowser.open('https://link.tidal.com/' + user_code, new=2) "Opening https://link.tidal.com/{}, log in or sign up to TIDAL.".format(
user_code
)
)
webbrowser.open("https://link.tidal.com/" + user_code, new=2)
data = { data = {
'client_id': self.client_id, "client_id": self.client_id,
'device_code': device_code, "device_code": device_code,
'client_secret': self.client_secret, "client_secret": self.client_secret,
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
'scope': 'r_usr w_usr' "scope": "r_usr w_usr",
} }
status_code = 400 status_code = 400
print('Checking link ', end='') print("Checking link ", end="")
while status_code == 400: while status_code == 400:
for index, char in enumerate("." * 5): for index, char in enumerate("." * 5):
@@ -542,7 +609,7 @@ class TidalTvSession(TidalSession):
sys.stdout.flush() sys.stdout.flush()
# exchange access code for oauth token # exchange access code for oauth token
time.sleep(0.2) time.sleep(0.2)
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data=data) r = requests.post(self.TIDAL_AUTH_BASE + "oauth2/token", data=data)
status_code = r.status_code status_code = r.status_code
index += 1 # lists are zero indexed, we need to increase by one for the accurate count index += 1 # lists are zero indexed, we need to increase by one for the accurate count
# backtrack the written characters, overwrite them with space, backtrack again: # backtrack the written characters, overwrite them with space, backtrack again:
@@ -550,52 +617,61 @@ class TidalTvSession(TidalSession):
sys.stdout.flush() sys.stdout.flush()
if r.status_code == 200: if r.status_code == 200:
print('\nSuccessfully linked!') print("\nSuccessfully linked!")
elif r.status_code == 401: elif r.status_code == 401:
raise TidalAuthError('Auth Error: ' + r.json()['error']) raise TidalAuthError("Auth Error: " + r.json()["error"])
self.access_token = r.json()['access_token'] self.access_token = r.json()["access_token"]
self.refresh_token = r.json()['refresh_token'] self.refresh_token = r.json()["refresh_token"]
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers()) r = requests.get(
assert (r.status_code == 200) "https://api.tidal.com/v1/sessions", headers=self.auth_headers()
self.user_id = r.json()['userId'] )
self.country_code = r.json()['countryCode'] assert r.status_code == 200
self.user_id = r.json()["userId"]
self.country_code = r.json()["countryCode"]
r = requests.get('https://api.tidal.com/v1/users/{}?countryCode={}'.format(self.user_id, self.country_code), r = requests.get(
headers=self.auth_headers()) "https://api.tidal.com/v1/users/{}?countryCode={}".format(
assert (r.status_code == 200) self.user_id, self.country_code
),
headers=self.auth_headers(),
)
assert r.status_code == 200
# self.username = r.json()['username'] # self.username = r.json()['username']
def refresh(self): def refresh(self):
assert (self.refresh_token is not None) assert self.refresh_token is not None
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={ r = requests.post(
'refresh_token': self.refresh_token, self.TIDAL_AUTH_BASE + "oauth2/token",
'client_id': self.client_id, data={
'client_secret': self.client_secret, "refresh_token": self.refresh_token,
'grant_type': 'refresh_token' "client_id": self.client_id,
}) "client_secret": self.client_secret,
"grant_type": "refresh_token",
},
)
if r.status_code == 200: if r.status_code == 200:
# print('TIDAL: Refreshing token successful') # print('TIDAL: Refreshing token successful')
self.access_token = r.json()['access_token'] self.access_token = r.json()["access_token"]
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
if 'refresh_token' in r.json(): if "refresh_token" in r.json():
self.refresh_token = r.json()['refresh_token'] self.refresh_token = r.json()["refresh_token"]
return r.status_code == 200 return r.status_code == 200
@staticmethod @staticmethod
def session_type(): def session_type():
return 'Tv' return "Tv"
def auth_headers(self): def auth_headers(self):
return { return {
'X-Tidal-Token': self.client_id, "X-Tidal-Token": self.client_id,
'Authorization': 'Bearer {}'.format(self.access_token), "Authorization": "Bearer {}".format(self.access_token),
'Connection': 'Keep-Alive', "Connection": "Keep-Alive",
'Accept-Encoding': 'gzip', "Accept-Encoding": "gzip",
'User-Agent': 'TIDAL_ANDROID/1039 okhttp/3.14.9' "User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9",
} }