Compare commits
6 Commits
d0d11f9939
...
1dc7b42068
| Author | SHA1 | Date | |
|---|---|---|---|
| 1dc7b42068 | |||
|
|
0d805ff5bf | ||
|
|
65f22735fe | ||
|
|
0466a40291 | ||
|
|
24fa01d260 | ||
|
|
c68fdd029b |
889
interface.py
889
interface.py
File diff suppressed because it is too large
Load Diff
563
tidal_api.py
563
tidal_api.py
@@ -19,20 +19,20 @@ from datetime import datetime, timedelta
|
|||||||
from utils.utils import create_requests_session
|
from utils.utils import create_requests_session
|
||||||
|
|
||||||
technical_names = {
|
technical_names = {
|
||||||
'eac3': 'E-AC-3 JOC (Dolby Digital Plus with Dolby Atmos, with 5.1 bed)',
|
"eac3": "E-AC-3 JOC (Dolby Digital Plus with Dolby Atmos, with 5.1 bed)",
|
||||||
'mha1': 'MPEG-H 3D Audio (Sony 360 Reality Audio)',
|
"mha1": "MPEG-H 3D Audio (Sony 360 Reality Audio)",
|
||||||
'ac4': 'AC-4 IMS (Dolby AC-4 with Dolby Atmos immersive stereo)',
|
"ac4": "AC-4 IMS (Dolby AC-4 with Dolby Atmos immersive stereo)",
|
||||||
'mqa': 'MQA (Master Quality Authenticated) in FLAC container',
|
"mqa": "MQA (Master Quality Authenticated) in FLAC container",
|
||||||
'flac': 'FLAC (Free Lossless Audio Codec)',
|
"flac": "FLAC (Free Lossless Audio Codec)",
|
||||||
'alac': 'ALAC (Apple Lossless Audio Codec)',
|
"alac": "ALAC (Apple Lossless Audio Codec)",
|
||||||
'mp4a.40.2': 'AAC 320 (Advanced Audio Coding) with a bitrate of 320kb/s',
|
"mp4a.40.2": "AAC 320 (Advanced Audio Coding) with a bitrate of 320kb/s",
|
||||||
'mp4a.40.5': 'AAC 96 (Advanced Audio Coding) with a bitrate of 96kb/s'
|
"mp4a.40.5": "AAC 96 (Advanced Audio Coding) with a bitrate of 96kb/s",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class TidalRequestError(Exception):
|
class TidalRequestError(Exception):
|
||||||
def __init__(self, payload):
|
def __init__(self, payload):
|
||||||
sf = '{subStatus}: {userMessage} (HTTP {status})'.format(**payload)
|
sf = "{subStatus}: {userMessage} (HTTP {status})".format(**payload)
|
||||||
self.payload = payload
|
self.payload = payload
|
||||||
super(TidalRequestError, self).__init__(sf)
|
super(TidalRequestError, self).__init__(sf)
|
||||||
|
|
||||||
@@ -55,13 +55,15 @@ class SessionType(Enum):
|
|||||||
|
|
||||||
|
|
||||||
class TidalApi(object):
|
class TidalApi(object):
|
||||||
TIDAL_API_BASE = 'https://api.tidal.com/v1/'
|
TIDAL_API_BASE = "https://api.tidal.com/v1/"
|
||||||
TIDAL_VIDEO_BASE = 'https://api.tidalhifi.com/v1/'
|
TIDAL_VIDEO_BASE = "https://api.tidalhifi.com/v1/"
|
||||||
TIDAL_CLIENT_VERSION = '2.26.1'
|
TIDAL_CLIENT_VERSION = "2.26.1"
|
||||||
|
|
||||||
def __init__(self, sessions: dict):
|
def __init__(self, sessions: dict):
|
||||||
self.sessions = sessions
|
self.sessions = sessions
|
||||||
self.default: SessionType = SessionType.TV # Change to TV or MOBILE depending on AC-4/360RA
|
self.default: SessionType = (
|
||||||
|
SessionType.TV
|
||||||
|
) # Change to TV or MOBILE depending on AC-4/360RA
|
||||||
|
|
||||||
self.s = create_requests_session()
|
self.s = create_requests_session()
|
||||||
|
|
||||||
@@ -69,14 +71,15 @@ class TidalApi(object):
|
|||||||
if params is None:
|
if params is None:
|
||||||
params = {}
|
params = {}
|
||||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||||
params['countryCode'] = self.sessions[self.default.name].country_code
|
params["countryCode"] = self.sessions[self.default.name].country_code
|
||||||
if 'limit' not in params:
|
if "limit" not in params:
|
||||||
params['limit'] = '9999'
|
params["limit"] = "9999"
|
||||||
|
|
||||||
resp = self.s.get(
|
resp = self.s.get(
|
||||||
self.TIDAL_API_BASE + url,
|
self.TIDAL_API_BASE + url,
|
||||||
headers=self.sessions[self.default.name].auth_headers(),
|
headers=self.sessions[self.default.name].auth_headers(),
|
||||||
params=params)
|
params=params,
|
||||||
|
)
|
||||||
|
|
||||||
# if the request 401s or 403s, try refreshing the TV/Mobile session in case that helps
|
# if the request 401s or 403s, try refreshing the TV/Mobile session in case that helps
|
||||||
if not refresh and (resp.status_code == 401 or resp.status_code == 403):
|
if not refresh and (resp.status_code == 401 or resp.status_code == 403):
|
||||||
@@ -93,150 +96,171 @@ class TidalApi(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
if not resp_json:
|
if not resp_json:
|
||||||
raise TidalError('Response was not valid JSON. HTTP status {}. {}'.format(resp.status_code, resp.text))
|
raise TidalError(
|
||||||
|
"Response was not valid JSON. HTTP status {}. {}".format(
|
||||||
|
resp.status_code, resp.text
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if 'status' in resp_json and resp_json['status'] == 404 and \
|
if (
|
||||||
'subStatus' in resp_json and resp_json['subStatus'] == 2001:
|
"status" in resp_json
|
||||||
raise TidalError('Error: {}. This might be region-locked.'.format(resp_json['userMessage']))
|
and resp_json["status"] == 404
|
||||||
|
and "subStatus" in resp_json
|
||||||
|
and resp_json["subStatus"] == 2001
|
||||||
|
):
|
||||||
|
raise TidalError(
|
||||||
|
"Error: {}. This might be region-locked.".format(
|
||||||
|
resp_json["userMessage"]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Really hacky way, pls don't copy this ever
|
# Really hacky way, pls don't copy this ever
|
||||||
if 'status' in resp_json and resp_json['status'] == 404 and \
|
if (
|
||||||
'error' in resp_json and resp_json['error'] == 'Not Found':
|
"status" in resp_json
|
||||||
|
and resp_json["status"] == 404
|
||||||
|
and "error" in resp_json
|
||||||
|
and resp_json["error"] == "Not Found"
|
||||||
|
):
|
||||||
return resp_json
|
return resp_json
|
||||||
|
|
||||||
if 'status' in resp_json and not resp_json['status'] == 200:
|
if "status" in resp_json and not resp_json["status"] == 200:
|
||||||
raise TidalRequestError(resp_json)
|
raise TidalRequestError(resp_json)
|
||||||
|
|
||||||
return resp_json
|
return resp_json
|
||||||
|
|
||||||
def get_stream_url(self, track_id, quality):
|
def get_stream_url(self, track_id, quality):
|
||||||
return self._get('tracks/' + str(track_id) + '/playbackinfopostpaywall/v4', {
|
return self._get(
|
||||||
'playbackmode': 'STREAM',
|
"tracks/" + str(track_id) + "/playbackinfopostpaywall/v4",
|
||||||
'assetpresentation': 'FULL',
|
{
|
||||||
'audioquality': quality,
|
"playbackmode": "STREAM",
|
||||||
'prefetch': 'false'
|
"assetpresentation": "FULL",
|
||||||
})
|
"audioquality": quality,
|
||||||
|
"prefetch": "false",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def get_search_data(self, search_term, limit=20):
|
def get_search_data(self, search_term, limit=20):
|
||||||
return self._get('search', params={
|
return self._get(
|
||||||
'query': str(search_term),
|
"search",
|
||||||
'offset': 0,
|
params={
|
||||||
'limit': limit,
|
"query": str(search_term),
|
||||||
'includeContributors': 'true'
|
"offset": 0,
|
||||||
})
|
"limit": limit,
|
||||||
|
"includeContributors": "true",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def get_page(self, pageurl, params=None):
|
def get_page(self, pageurl, params=None):
|
||||||
local_params = {
|
local_params = {
|
||||||
'deviceType': 'TV',
|
"deviceType": "TV",
|
||||||
'locale': 'en_US',
|
"locale": "en_US",
|
||||||
'mediaFormats': 'SONY_360'
|
"mediaFormats": "SONY_360",
|
||||||
}
|
}
|
||||||
|
|
||||||
if params:
|
if params:
|
||||||
local_params.update(params)
|
local_params.update(params)
|
||||||
|
|
||||||
return self._get('pages/' + pageurl, params=local_params)
|
return self._get("pages/" + pageurl, params=local_params)
|
||||||
|
|
||||||
def get_playlist_items(self, playlist_id):
|
def get_playlist_items(self, playlist_id):
|
||||||
result = self._get('playlists/' + playlist_id + '/items', {
|
result = self._get(
|
||||||
'offset': 0,
|
"playlists/" + playlist_id + "/items", {"offset": 0, "limit": 100}
|
||||||
'limit': 100
|
)
|
||||||
})
|
|
||||||
|
|
||||||
if result['totalNumberOfItems'] <= 100:
|
if result["totalNumberOfItems"] <= 100:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
offset = len(result['items'])
|
offset = len(result["items"])
|
||||||
while True:
|
while True:
|
||||||
buf = self._get('playlists/' + playlist_id + '/items', {
|
buf = self._get(
|
||||||
'offset': offset,
|
"playlists/" + playlist_id + "/items", {"offset": offset, "limit": 100}
|
||||||
'limit': 100
|
)
|
||||||
})
|
offset += len(buf["items"])
|
||||||
offset += len(buf['items'])
|
result["items"] += buf["items"]
|
||||||
result['items'] += buf['items']
|
|
||||||
|
|
||||||
if offset >= result['totalNumberOfItems']:
|
if offset >= result["totalNumberOfItems"]:
|
||||||
break
|
break
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def get_playlist(self, playlist_id):
|
def get_playlist(self, playlist_id):
|
||||||
return self._get('playlists/' + str(playlist_id))
|
return self._get("playlists/" + str(playlist_id))
|
||||||
|
|
||||||
def get_album_tracks(self, album_id):
|
def get_album_tracks(self, album_id):
|
||||||
return self._get('albums/' + str(album_id) + '/tracks')
|
return self._get("albums/" + str(album_id) + "/tracks")
|
||||||
|
|
||||||
def get_track(self, track_id):
|
def get_track(self, track_id):
|
||||||
return self._get('tracks/' + str(track_id))
|
return self._get("tracks/" + str(track_id))
|
||||||
|
|
||||||
def get_album(self, album_id):
|
def get_album(self, album_id):
|
||||||
return self._get('albums/' + str(album_id))
|
return self._get("albums/" + str(album_id))
|
||||||
|
|
||||||
def get_video(self, video_id):
|
def get_video(self, video_id):
|
||||||
return self._get('videos/' + str(video_id))
|
return self._get("videos/" + str(video_id))
|
||||||
|
|
||||||
def get_tracks_by_isrc(self, isrc):
|
def get_tracks_by_isrc(self, isrc):
|
||||||
return self._get('tracks', params={
|
return self._get("tracks", params={"isrc": isrc})
|
||||||
'isrc': isrc
|
|
||||||
})
|
|
||||||
|
|
||||||
def get_favorite_tracks(self, user_id):
|
def get_favorite_tracks(self, user_id):
|
||||||
return self._get('users/' + str(user_id) + '/favorites/tracks')
|
return self._get("users/" + str(user_id) + "/favorites/tracks")
|
||||||
|
|
||||||
def get_track_contributors(self, track_id):
|
def get_track_contributors(self, track_id):
|
||||||
return self._get('tracks/' + str(track_id) + '/contributors')
|
return self._get("tracks/" + str(track_id) + "/contributors")
|
||||||
|
|
||||||
def get_album_contributors(self, album_id, offset: int = 0, limit: int = 100):
|
def get_album_contributors(self, album_id, offset: int = 0, limit: int = 100):
|
||||||
return self._get('albums/' + album_id + '/items/credits', params={
|
return self._get(
|
||||||
'replace': True,
|
"albums/" + album_id + "/items/credits",
|
||||||
'offset': offset,
|
params={
|
||||||
'limit': limit,
|
"replace": True,
|
||||||
'includeContributors': True
|
"offset": offset,
|
||||||
})
|
"limit": limit,
|
||||||
|
"includeContributors": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def get_lyrics(self, track_id):
|
def get_lyrics(self, track_id):
|
||||||
return self._get('tracks/' + str(track_id) + '/lyrics', params={
|
return self._get(
|
||||||
'deviceType': 'TV',
|
"tracks/" + str(track_id) + "/lyrics",
|
||||||
'locale': 'en_US'
|
params={"deviceType": "TV", "locale": "en_US"},
|
||||||
})
|
)
|
||||||
|
|
||||||
def get_video_contributors(self, video_id):
|
def get_video_contributors(self, video_id):
|
||||||
return self._get('videos/' + video_id + '/contributors', params={
|
return self._get("videos/" + video_id + "/contributors", params={"limit": 50})
|
||||||
'limit': 50
|
|
||||||
})
|
|
||||||
|
|
||||||
def get_video_stream_url(self, video_id):
|
def get_video_stream_url(self, video_id):
|
||||||
return self._get('videos/' + str(video_id) + '/streamurl')
|
return self._get("videos/" + str(video_id) + "/streamurl")
|
||||||
|
|
||||||
def get_artist(self, artist_id):
|
def get_artist(self, artist_id):
|
||||||
return self._get('artists/' + str(artist_id))
|
return self._get("artists/" + str(artist_id))
|
||||||
|
|
||||||
def get_artist_albums(self, artist_id):
|
def get_artist_albums(self, artist_id):
|
||||||
return self._get('artists/' + str(artist_id) + '/albums')
|
return self._get("artists/" + str(artist_id) + "/albums")
|
||||||
|
|
||||||
def get_artist_albums_ep_singles(self, artist_id):
|
def get_artist_albums_ep_singles(self, artist_id):
|
||||||
return self._get('artists/' + str(artist_id) + '/albums', params={'filter': 'EPSANDSINGLES'})
|
return self._get(
|
||||||
|
"artists/" + str(artist_id) + "/albums", params={"filter": "EPSANDSINGLES"}
|
||||||
|
)
|
||||||
|
|
||||||
def get_type_from_id(self, id_):
|
def get_type_from_id(self, id_):
|
||||||
result = None
|
result = None
|
||||||
try:
|
try:
|
||||||
result = self.get_album(id_)
|
result = self.get_album(id_)
|
||||||
return 'a'
|
return "a"
|
||||||
except TidalError:
|
except TidalError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
result = self.get_artist(id_)
|
result = self.get_artist(id_)
|
||||||
return 'r'
|
return "r"
|
||||||
except TidalError:
|
except TidalError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
result = self.get_track(id_)
|
result = self.get_track(id_)
|
||||||
return 't'
|
return "t"
|
||||||
except TidalError:
|
except TidalError:
|
||||||
pass
|
pass
|
||||||
try:
|
try:
|
||||||
result = self.get_video(id_)
|
result = self.get_video(id_)
|
||||||
return 'v'
|
return "v"
|
||||||
except TidalError:
|
except TidalError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -256,6 +280,7 @@ class TidalSession(ABC):
|
|||||||
"""
|
"""
|
||||||
Tidal abstract session object with all (abstract) functions needed: auth_headers(), refresh(), session_type()
|
Tidal abstract session object with all (abstract) functions needed: auth_headers(), refresh(), session_type()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.access_token = None
|
self.access_token = None
|
||||||
self.refresh_token = None
|
self.refresh_token = None
|
||||||
@@ -264,30 +289,32 @@ class TidalSession(ABC):
|
|||||||
self.country_code = None
|
self.country_code = None
|
||||||
|
|
||||||
def set_storage(self, storage: dict):
|
def set_storage(self, storage: dict):
|
||||||
self.access_token = storage.get('access_token')
|
self.access_token = storage.get("access_token")
|
||||||
self.refresh_token = storage.get('refresh_token')
|
self.refresh_token = storage.get("refresh_token")
|
||||||
self.expires = storage.get('expires')
|
self.expires = storage.get("expires")
|
||||||
self.user_id = storage.get('user_id')
|
self.user_id = storage.get("user_id")
|
||||||
self.country_code = storage.get('country_code')
|
self.country_code = storage.get("country_code")
|
||||||
|
|
||||||
def get_storage(self) -> dict:
|
def get_storage(self) -> dict:
|
||||||
return {
|
return {
|
||||||
'access_token': self.access_token,
|
"access_token": self.access_token,
|
||||||
'refresh_token': self.refresh_token,
|
"refresh_token": self.refresh_token,
|
||||||
'expires': self.expires,
|
"expires": self.expires,
|
||||||
'user_id': self.user_id,
|
"user_id": self.user_id,
|
||||||
'country_code': self.country_code
|
"country_code": self.country_code,
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_subscription(self) -> str:
|
def get_subscription(self) -> str:
|
||||||
if self.access_token:
|
if self.access_token:
|
||||||
r = requests.get(f'https://api.tidal.com/v1/users/{self.user_id}/subscription',
|
r = requests.get(
|
||||||
params={'countryCode': self.country_code},
|
f"https://api.tidal.com/v1/users/{self.user_id}/subscription",
|
||||||
headers=self.auth_headers())
|
params={"countryCode": self.country_code},
|
||||||
|
headers=self.auth_headers(),
|
||||||
|
)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError(r.json()['userMessage'])
|
raise TidalAuthError(r.json()["userMessage"])
|
||||||
|
|
||||||
return r.json()['subscription']['type']
|
return r.json()["subscription"]["type"]
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def auth_headers(self) -> dict:
|
def auth_headers(self) -> dict:
|
||||||
@@ -301,7 +328,9 @@ class TidalSession(ABC):
|
|||||||
if self.access_token is None or datetime.now() > self.expires:
|
if self.access_token is None or datetime.now() > self.expires:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers())
|
r = requests.get(
|
||||||
|
"https://api.tidal.com/v1/sessions", headers=self.auth_headers()
|
||||||
|
)
|
||||||
return r.status_code == 200
|
return r.status_code == 200
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
@@ -320,30 +349,59 @@ class TidalMobileSession(TidalSession):
|
|||||||
|
|
||||||
def __init__(self, client_token: str):
|
def __init__(self, client_token: str):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.TIDAL_LOGIN_BASE = 'https://login.tidal.com/api/'
|
self.TIDAL_LOGIN_BASE = "https://login.tidal.com/api/"
|
||||||
self.TIDAL_AUTH_BASE = 'https://auth.tidal.com/v1/'
|
self.TIDAL_AUTH_BASE = "https://auth.tidal.com/v1/"
|
||||||
|
|
||||||
self.client_id = client_token
|
self.client_id = client_token
|
||||||
self.redirect_uri = 'https://tidal.com/android/login/auth'
|
self.redirect_uri = "https://tidal.com/android/login/auth"
|
||||||
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b'=')
|
self.code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(
|
||||||
self.code_challenge = base64.urlsafe_b64encode(hashlib.sha256(self.code_verifier).digest()).rstrip(b'=')
|
b"="
|
||||||
|
)
|
||||||
|
self.code_challenge = base64.urlsafe_b64encode(
|
||||||
|
hashlib.sha256(self.code_verifier).digest()
|
||||||
|
).rstrip(b"=")
|
||||||
self.client_unique_key = secrets.token_hex(8)
|
self.client_unique_key = secrets.token_hex(8)
|
||||||
self.user_agent = 'Mozilla/5.0 (Linux; Android 13; Pixel 8 Build/TQ2A.230505.002; wv) AppleWebKit/537.36 ' \
|
self.user_agent = (
|
||||||
'(KHTML, like Gecko) Version/4.0 Chrome/119.0.6045.163 Mobile Safari/537.36'
|
"Mozilla/5.0 (Linux; Android 13; Pixel 8 Build/TQ2A.230505.002; wv) AppleWebKit/537.36 "
|
||||||
|
"(KHTML, like Gecko) Version/4.0 Chrome/119.0.6045.163 Mobile Safari/537.36"
|
||||||
|
)
|
||||||
|
|
||||||
def auth(self, username: str, password: str):
|
def auth(self, username: str, password: str):
|
||||||
s = requests.Session()
|
s = requests.Session()
|
||||||
|
|
||||||
|
# try Tidal DataDome cookie request
|
||||||
|
r = s.post(
|
||||||
|
"https://dd.tidal.com/js/",
|
||||||
|
data={
|
||||||
|
"jsData": f'{{"opts":"endpoint,ajaxListenerPath","ua":"{self.user_agent}"}}',
|
||||||
|
"ddk": "1F633CDD8EF22541BD6D9B1B8EF13A", # API Key (required)
|
||||||
|
"Referer": "https%3A%2F%2Ftidal.com%2F", # Referer authorize link (required)
|
||||||
|
"responsePage": "origin", # useless?
|
||||||
|
"ddv": "4.17.0", # useless?
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"user-agent": self.user_agent,
|
||||||
|
"content-type": "application/x-www-form-urlencoded",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if r.status_code != 200 or not r.json().get("cookie"):
|
||||||
|
raise TidalAuthError("TIDAL BOT protection, could not get DataDome cookie!")
|
||||||
|
|
||||||
|
# get the cookie from the json request and save it in the session
|
||||||
|
dd_cookie = r.json().get("cookie").split(";")[0]
|
||||||
|
s.cookies[dd_cookie.split("=")[0]] = dd_cookie.split("=")[1]
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
'response_type': 'code',
|
"response_type": "code",
|
||||||
'redirect_uri': self.redirect_uri,
|
"redirect_uri": self.redirect_uri,
|
||||||
'lang': 'en_US',
|
"lang": "en_US",
|
||||||
'appMode': 'android',
|
"appMode": "android",
|
||||||
'client_id': self.client_id,
|
"client_id": self.client_id,
|
||||||
'client_unique_key': self.client_unique_key,
|
"client_unique_key": self.client_unique_key,
|
||||||
'code_challenge': self.code_challenge,
|
"code_challenge": self.code_challenge,
|
||||||
'code_challenge_method': 'S256',
|
"code_challenge_method": "S256",
|
||||||
'restrict_signup': 'true'
|
"restrict_signup": "true",
|
||||||
}
|
}
|
||||||
|
|
||||||
full_authorize_url = f"https://login.tidal.com/authorize?{urlencode(params)}"
|
full_authorize_url = f"https://login.tidal.com/authorize?{urlencode(params)}"
|
||||||
@@ -380,121 +438,137 @@ class TidalMobileSession(TidalSession):
|
|||||||
raise TidalAuthError("TIDAL BOT protection, try again later!")
|
raise TidalAuthError("TIDAL BOT protection, try again later!")
|
||||||
|
|
||||||
# enter email, verify email is valid
|
# enter email, verify email is valid
|
||||||
r = s.post(self.TIDAL_LOGIN_BASE + 'email', params=params, json={
|
r = s.post(
|
||||||
'email': username
|
self.TIDAL_LOGIN_BASE + "email",
|
||||||
}, headers={
|
params=params,
|
||||||
'user-agent': self.user_agent,
|
json={"email": username},
|
||||||
'x-csrf-token': s.cookies['_csrf-token'],
|
headers={
|
||||||
'accept': 'application/json, text/plain, */*',
|
"user-agent": self.user_agent,
|
||||||
'content-type': 'application/json',
|
"x-csrf-token": s.cookies["_csrf-token"],
|
||||||
'accept-language': 'en-US',
|
"accept": "application/json, text/plain, */*",
|
||||||
'x-requested-with': 'com.aspiro.tidal'
|
"content-type": "application/json",
|
||||||
})
|
"accept-language": "en-US",
|
||||||
|
"x-requested-with": "com.aspiro.tidal",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError(r.text)
|
raise TidalAuthError(r.text)
|
||||||
|
|
||||||
if not r.json()['isValidEmail']:
|
if not r.json()["isValidEmail"]:
|
||||||
raise TidalAuthError('Invalid email')
|
raise TidalAuthError("Invalid email")
|
||||||
if r.json()['isNewUser']:
|
if r.json()["isNewUser"]:
|
||||||
raise TidalAuthError('User does not exist')
|
raise TidalAuthError("User does not exist")
|
||||||
|
|
||||||
# login with user credentials
|
# login with user credentials
|
||||||
r = s.post(self.TIDAL_LOGIN_BASE + 'email/user/existing', params=params, json={
|
r = s.post(
|
||||||
'email': username,
|
self.TIDAL_LOGIN_BASE + "email/user/existing",
|
||||||
'password': password
|
params=params,
|
||||||
}, headers={
|
json={"email": username, "password": password},
|
||||||
'User-Agent': self.user_agent,
|
headers={
|
||||||
'x-csrf-token': s.cookies['_csrf-token'],
|
"User-Agent": self.user_agent,
|
||||||
'accept': 'application/json, text/plain, */*',
|
"x-csrf-token": s.cookies["_csrf-token"],
|
||||||
'content-type': 'application/json',
|
"accept": "application/json, text/plain, */*",
|
||||||
'accept-language': 'en-US',
|
"content-type": "application/json",
|
||||||
'x-requested-with': 'com.aspiro.tidal'
|
"accept-language": "en-US",
|
||||||
})
|
"x-requested-with": "com.aspiro.tidal",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError(r.text)
|
raise TidalAuthError(r.text)
|
||||||
|
|
||||||
# retrieve access code
|
# retrieve access code
|
||||||
r = s.get('https://login.tidal.com/success', allow_redirects=False, headers={
|
r = s.get(
|
||||||
'user-agent': self.user_agent,
|
"https://login.tidal.com/success",
|
||||||
'accept-language': 'en-US',
|
allow_redirects=False,
|
||||||
'x-requested-with': 'com.aspiro.tidal'
|
headers={
|
||||||
})
|
"user-agent": self.user_agent,
|
||||||
|
"accept-language": "en-US",
|
||||||
|
"x-requested-with": "com.aspiro.tidal",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code == 401:
|
if r.status_code == 401:
|
||||||
raise TidalAuthError('Incorrect password')
|
raise TidalAuthError("Incorrect password")
|
||||||
assert (r.status_code == 302)
|
assert r.status_code == 302
|
||||||
url = urlparse.urlparse(r.headers['location'])
|
url = urlparse.urlparse(r.headers["location"])
|
||||||
oauth_code = parse_qs(url.query)['code'][0]
|
oauth_code = parse_qs(url.query)["code"][0]
|
||||||
|
|
||||||
# exchange access code for oauth token
|
# exchange access code for oauth token
|
||||||
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={
|
r = requests.post(
|
||||||
'code': oauth_code,
|
self.TIDAL_AUTH_BASE + "oauth2/token",
|
||||||
'client_id': self.client_id,
|
data={
|
||||||
'grant_type': 'authorization_code',
|
"code": oauth_code,
|
||||||
'redirect_uri': self.redirect_uri,
|
"client_id": self.client_id,
|
||||||
'scope': 'r_usr w_usr w_sub',
|
"grant_type": "authorization_code",
|
||||||
'code_verifier': self.code_verifier,
|
"redirect_uri": self.redirect_uri,
|
||||||
'client_unique_key': self.client_unique_key
|
"scope": "r_usr w_usr w_sub",
|
||||||
}, headers={
|
"code_verifier": self.code_verifier,
|
||||||
'User-Agent': self.user_agent
|
"client_unique_key": self.client_unique_key,
|
||||||
})
|
},
|
||||||
|
headers={"User-Agent": self.user_agent},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError(r.text)
|
raise TidalAuthError(r.text)
|
||||||
|
|
||||||
self.access_token = r.json()['access_token']
|
self.access_token = r.json()["access_token"]
|
||||||
self.refresh_token = r.json()['refresh_token']
|
self.refresh_token = r.json()["refresh_token"]
|
||||||
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in'])
|
self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
|
||||||
|
|
||||||
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers())
|
r = requests.get(
|
||||||
|
"https://api.tidal.com/v1/sessions", headers=self.auth_headers()
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError(r.text)
|
raise TidalAuthError(r.text)
|
||||||
|
|
||||||
self.user_id = r.json()['userId']
|
self.user_id = r.json()["userId"]
|
||||||
self.country_code = r.json()['countryCode']
|
self.country_code = r.json()["countryCode"]
|
||||||
|
|
||||||
def refresh(self):
|
def refresh(self):
|
||||||
assert (self.refresh_token is not None)
|
assert self.refresh_token is not None
|
||||||
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={
|
r = requests.post(
|
||||||
'refresh_token': self.refresh_token,
|
self.TIDAL_AUTH_BASE + "oauth2/token",
|
||||||
'client_id': self.client_id,
|
data={
|
||||||
'grant_type': 'refresh_token'
|
"refresh_token": self.refresh_token,
|
||||||
})
|
"client_id": self.client_id,
|
||||||
|
"grant_type": "refresh_token",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
# print('TIDAL: Refreshing token successful')
|
# print('TIDAL: Refreshing token successful')
|
||||||
self.access_token = r.json()['access_token']
|
self.access_token = r.json()["access_token"]
|
||||||
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in'])
|
self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
|
||||||
|
|
||||||
# they're already stored if logging in with email/pass
|
# they're already stored if logging in with email/pass
|
||||||
if not self.user_id:
|
if not self.user_id:
|
||||||
self.user_id = r.json().get('user', {}).get('userId')
|
self.user_id = r.json().get("user", {}).get("userId")
|
||||||
if not self.country_code:
|
if not self.country_code:
|
||||||
self.country_code = r.json().get('user', {}).get('countryCode')
|
self.country_code = r.json().get("user", {}).get("countryCode")
|
||||||
|
|
||||||
if 'refresh_token' in r.json():
|
if "refresh_token" in r.json():
|
||||||
self.refresh_token = r.json()['refresh_token']
|
self.refresh_token = r.json()["refresh_token"]
|
||||||
|
|
||||||
elif r.status_code == 401:
|
elif r.status_code == 401:
|
||||||
print('\tERROR: ' + r.json()['userMessage'])
|
print("\tERROR: " + r.json()["userMessage"])
|
||||||
|
|
||||||
return r.status_code == 200
|
return r.status_code == 200
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def session_type():
|
def session_type():
|
||||||
return 'Mobile'
|
return "Mobile"
|
||||||
|
|
||||||
def auth_headers(self):
|
def auth_headers(self):
|
||||||
return {
|
return {
|
||||||
'Host': 'api.tidal.com',
|
"Host": "api.tidal.com",
|
||||||
'X-Tidal-Token': self.client_id,
|
"X-Tidal-Token": self.client_id,
|
||||||
'Authorization': 'Bearer {}'.format(self.access_token),
|
"Authorization": "Bearer {}".format(self.access_token),
|
||||||
'Connection': 'Keep-Alive',
|
"Connection": "Keep-Alive",
|
||||||
'Accept-Encoding': 'gzip',
|
"Accept-Encoding": "gzip",
|
||||||
'User-Agent': 'TIDAL_ANDROID/1039 okhttp/3.14.9'
|
"User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -505,7 +579,7 @@ class TidalTvSession(TidalSession):
|
|||||||
|
|
||||||
def __init__(self, client_token: str, client_secret: str):
|
def __init__(self, client_token: str, client_secret: str):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.TIDAL_AUTH_BASE = 'https://auth.tidal.com/v1/'
|
self.TIDAL_AUTH_BASE = "https://auth.tidal.com/v1/"
|
||||||
|
|
||||||
self.client_id = client_token
|
self.client_id = client_token
|
||||||
self.client_secret = client_secret
|
self.client_secret = client_secret
|
||||||
@@ -520,29 +594,35 @@ class TidalTvSession(TidalSession):
|
|||||||
s = requests.Session()
|
s = requests.Session()
|
||||||
|
|
||||||
# retrieve csrf token for subsequent request
|
# retrieve csrf token for subsequent request
|
||||||
r = s.post(self.TIDAL_AUTH_BASE + 'oauth2/device_authorization', data={
|
r = s.post(
|
||||||
'client_id': self.client_id,
|
self.TIDAL_AUTH_BASE + "oauth2/device_authorization",
|
||||||
'scope': 'r_usr w_usr'
|
data={"client_id": self.client_id, "scope": "r_usr w_usr"},
|
||||||
})
|
)
|
||||||
|
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise TidalAuthError("Authorization failed! Is the clientid/token up to date?")
|
raise TidalAuthError(
|
||||||
|
"Authorization failed! Is the clientid/token up to date?"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
device_code = r.json()['deviceCode']
|
device_code = r.json()["deviceCode"]
|
||||||
user_code = r.json()['userCode']
|
user_code = r.json()["userCode"]
|
||||||
print('Opening https://link.tidal.com/{}, log in or sign up to TIDAL.'.format(user_code))
|
print(
|
||||||
webbrowser.open('https://link.tidal.com/' + user_code, new=2)
|
"Opening https://link.tidal.com/{}, log in or sign up to TIDAL.".format(
|
||||||
|
user_code
|
||||||
|
)
|
||||||
|
)
|
||||||
|
webbrowser.open("https://link.tidal.com/" + user_code, new=2)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'client_id': self.client_id,
|
"client_id": self.client_id,
|
||||||
'device_code': device_code,
|
"device_code": device_code,
|
||||||
'client_secret': self.client_secret,
|
"client_secret": self.client_secret,
|
||||||
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
|
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
|
||||||
'scope': 'r_usr w_usr'
|
"scope": "r_usr w_usr",
|
||||||
}
|
}
|
||||||
|
|
||||||
status_code = 400
|
status_code = 400
|
||||||
print('Checking link ', end='')
|
print("Checking link ", end="")
|
||||||
|
|
||||||
while status_code == 400:
|
while status_code == 400:
|
||||||
for index, char in enumerate("." * 5):
|
for index, char in enumerate("." * 5):
|
||||||
@@ -550,7 +630,7 @@ class TidalTvSession(TidalSession):
|
|||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
# exchange access code for oauth token
|
# exchange access code for oauth token
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data=data)
|
r = requests.post(self.TIDAL_AUTH_BASE + "oauth2/token", data=data)
|
||||||
status_code = r.status_code
|
status_code = r.status_code
|
||||||
index += 1 # lists are zero indexed, we need to increase by one for the accurate count
|
index += 1 # lists are zero indexed, we need to increase by one for the accurate count
|
||||||
# backtrack the written characters, overwrite them with space, backtrack again:
|
# backtrack the written characters, overwrite them with space, backtrack again:
|
||||||
@@ -558,61 +638,70 @@ class TidalTvSession(TidalSession):
|
|||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
print('\nSuccessfully linked!')
|
print("\nSuccessfully linked!")
|
||||||
elif r.status_code == 401:
|
elif r.status_code == 401:
|
||||||
raise TidalAuthError('Auth Error: ' + r.json()['error'])
|
raise TidalAuthError("Auth Error: " + r.json()["error"])
|
||||||
|
|
||||||
self.access_token = r.json()['access_token']
|
self.access_token = r.json()["access_token"]
|
||||||
self.refresh_token = r.json()['refresh_token']
|
self.refresh_token = r.json()["refresh_token"]
|
||||||
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in'])
|
self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
|
||||||
|
|
||||||
r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers())
|
r = requests.get(
|
||||||
assert (r.status_code == 200)
|
"https://api.tidal.com/v1/sessions", headers=self.auth_headers()
|
||||||
self.user_id = r.json()['userId']
|
)
|
||||||
self.country_code = r.json()['countryCode']
|
assert r.status_code == 200
|
||||||
|
self.user_id = r.json()["userId"]
|
||||||
|
self.country_code = r.json()["countryCode"]
|
||||||
|
|
||||||
r = requests.get('https://api.tidal.com/v1/users/{}?countryCode={}'.format(self.user_id, self.country_code),
|
r = requests.get(
|
||||||
headers=self.auth_headers())
|
"https://api.tidal.com/v1/users/{}?countryCode={}".format(
|
||||||
assert (r.status_code == 200)
|
self.user_id, self.country_code
|
||||||
|
),
|
||||||
|
headers=self.auth_headers(),
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
# self.username = r.json()['username']
|
# self.username = r.json()['username']
|
||||||
|
|
||||||
def refresh(self):
|
def refresh(self):
|
||||||
assert (self.refresh_token is not None)
|
assert self.refresh_token is not None
|
||||||
r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={
|
r = requests.post(
|
||||||
'refresh_token': self.refresh_token,
|
self.TIDAL_AUTH_BASE + "oauth2/token",
|
||||||
'client_id': self.client_id,
|
data={
|
||||||
'client_secret': self.client_secret,
|
"refresh_token": self.refresh_token,
|
||||||
'grant_type': 'refresh_token'
|
"client_id": self.client_id,
|
||||||
})
|
"client_secret": self.client_secret,
|
||||||
|
"grant_type": "refresh_token",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
# print('TIDAL: Refreshing token successful')
|
# print('TIDAL: Refreshing token successful')
|
||||||
self.access_token = r.json()['access_token']
|
self.access_token = r.json()["access_token"]
|
||||||
self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in'])
|
self.expires = datetime.now() + timedelta(seconds=r.json()["expires_in"])
|
||||||
|
|
||||||
# they're already stored if logging in with email/pass
|
# they're already stored if logging in with email/pass
|
||||||
if not self.user_id:
|
if not self.user_id:
|
||||||
self.user_id = r.json().get('user', {}).get('userId')
|
self.user_id = r.json().get("user", {}).get("userId")
|
||||||
if not self.country_code:
|
if not self.country_code:
|
||||||
self.country_code = r.json().get('user', {}).get('countryCode')
|
self.country_code = r.json().get("user", {}).get("countryCode")
|
||||||
|
|
||||||
if 'refresh_token' in r.json():
|
if "refresh_token" in r.json():
|
||||||
self.refresh_token = r.json()['refresh_token']
|
self.refresh_token = r.json()["refresh_token"]
|
||||||
|
|
||||||
elif r.status_code == 401:
|
elif r.status_code == 401:
|
||||||
print('\tERROR: ' + r.json()['userMessage'])
|
print("\tERROR: " + r.json()["userMessage"])
|
||||||
|
|
||||||
return r.status_code == 200
|
return r.status_code == 200
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def session_type():
|
def session_type():
|
||||||
return 'Tv'
|
return "Tv"
|
||||||
|
|
||||||
def auth_headers(self):
|
def auth_headers(self):
|
||||||
return {
|
return {
|
||||||
'X-Tidal-Token': self.client_id,
|
"X-Tidal-Token": self.client_id,
|
||||||
'Authorization': 'Bearer {}'.format(self.access_token),
|
"Authorization": "Bearer {}".format(self.access_token),
|
||||||
'Connection': 'Keep-Alive',
|
"Connection": "Keep-Alive",
|
||||||
'Accept-Encoding': 'gzip',
|
"Accept-Encoding": "gzip",
|
||||||
'User-Agent': 'TIDAL_ANDROID/1039 okhttp/3.14.9'
|
"User-Agent": "TIDAL_ANDROID/1039 okhttp/3.14.9",
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user