From de2c1ca791c6f2ad25a7aabe534ae538d0846d47 Mon Sep 17 00:00:00 2001 From: Dniel97 <6324072+Dniel97@users.noreply.github.com> Date: Sat, 28 Aug 2021 15:31:36 +0200 Subject: [PATCH] Tidal module + README added --- .gitignore | 6 + README.md | 111 ++++++++++ modules/tidal/__init__.py | 0 modules/tidal/interface.py | 291 ++++++++++++++++++++++++++ modules/tidal/tidal_api.py | 407 +++++++++++++++++++++++++++++++++++++ 5 files changed, 815 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 modules/tidal/__init__.py create mode 100644 modules/tidal/interface.py create mode 100644 modules/tidal/tidal_api.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..78e916c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +venv +.vscode +.DS_Store +.idea/ +.python-version diff --git a/README.md b/README.md new file mode 100644 index 0000000..08f655e --- /dev/null +++ b/README.md @@ -0,0 +1,111 @@ + + +OrpheusDL - Tidal +================= + +A Tidal module for the OrpheusDL modular archival music program + +[Report Bug](https://github.com/yarrm80s/orpheusdl/issues) +ยท +[Request Feature](https://github.com/yarrm80s/orpheusdl/issues) + + +## Table of content + +- [About OrpheusDL - Tidal](#about-orpheusdl-tidal) +- [Getting Started](#getting-started) + - [Prerequisites](#prerequisites) + - [Installation](#installation) +- [Usage](#usage) +- [Configuration](#configuration) + - [Global](#global) + - [Tidal](#tidal) +- [Contact](#contact) +- [Acknowledgements](#acknowledgements) + + + + +## About OrpheusDL - Tidal + +OrpheusDL - Tidal is a module written in Python which allows archiving from **Tidal** for the modular music archival program. + + + +## Getting Started + +Follow these steps to get a local copy of Orpheus up and running: + +### Prerequisites + +* Already have [OrpheusDL](https://github.com/yarrm80s/orpheusdl) installed + +### Installation + +1. Clone the repo inside the folder `OrpheusDL` + ```sh + git clone https://github.com/Dniel97/orpheusdl-tidal.git + ``` +2. Execute: + ```sh + python orpheus.py search tidal track darkside + ``` +3. Now the `config/settings.json` file should be updated with the Tidal settings + + +## Usage + +Just call `orpheus.py` with any link you want to archive: + +```sh +python orpheus.py https://tidal.com/browse/album/92265334 +``` + + +## Configuration + +You can customize every module from Orpheus individually and also set general/global settings which are active in every +loaded module. You'll find the configuration file here: `config/settings.json` + +### Global + +```json +"global": { + "general": { + "album_search_return_only_albums": false, + "download_path": "./downloads/", + "download_quality": "lossless" + }, +``` + +`download_quality`: Choose one of the following settings: +* "hifi": FLAC with MQA up to 48/24 +* "lossless": FLAC with 44.1/16 +* "high": AAC 320 kbit/s +* "low": AAC 96 kbit/s + +### Tidal +```json + "tidal": { + "client_token": "", + "client_secret": "", +} +``` +`client_token`: Enter a valid TV client token + +`client_secret`: Enter a valid TV client secret for the `client_token` + + +## Contact + +Yarrm80s (pronounced 'Yeargh mateys!') - [@yarrm80s](https://github.com/yarrm80s) + +Dniel97 - [@Dniel97](https://github.com/Dniel97) + +Project Link: [OrpheusDL Tidal Public GitHub Repository](https://github.com/Dniel97/orpheusdl-tidal) + + + +## Acknowledgements +* [RedSea](https://github.com/redsudo/RedSea) +* [My RedSea fork](https://github.com/Dniel97/RedSea) diff --git a/modules/tidal/__init__.py b/modules/tidal/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/tidal/interface.py b/modules/tidal/interface.py new file mode 100644 index 0000000..4e0c491 --- /dev/null +++ b/modules/tidal/interface.py @@ -0,0 +1,291 @@ +import base64 +import json +import logging +import re + +from urllib.parse import urlparse + +from utils.models import * +from utils.utils import sanitise_name +from .tidal_api import TidalTvSession, TidalApi, TidalAuthError + +module_information = ModuleInformation( + service_name='Tidal', + module_supported_modes=ModuleModes.download | ModuleModes.credits | ModuleModes.lyrics | ModuleModes.covers, + flags=ModuleFlags.custom_url_parsing, + global_settings={'client_token': 'aR7gUaTK1ihpXOEP', 'client_secret': 'eVWBEkuL2FCjxgjOkR3yK0RYZEbcrMXRc2l8fU3ZCdE='}, + temporary_settings=['session'], + netlocation_constant='tidal', + test_url='https://tidal.com/browse/track/92265335' +) + +# 5 = 320 kbps MP3, 6 = 16-bit FLAC, 7 = 24-bit / =< 96kHz FLAC, 27 =< 192 kHz FLAC +QUALITY_PARSER = { + QualityEnum.LOW: 'LOW', + QualityEnum.MEDIUM: 'HIGH', + QualityEnum.HIGH: 'HIGH', + QualityEnum.LOSSLESS: 'LOSSLESS', + QualityEnum.HIFI: 'HI_RES' +} + + +class ModuleInterface: + def __init__(self, module_controller: ModuleController): + self.module_controller = module_controller + settings = module_controller.module_settings + + session: TidalTvSession = module_controller.temporary_settings_controller.read('session') + + if not session: + logging.debug('Tidal: No session found, creating new one') + session = TidalTvSession(settings['client_token'], settings['client_secret']) + + module_controller.temporary_settings_controller.set('session', session) + + # Always try to refresh session + if not session.valid(): + session.refresh() + # Save the refreshed session in the temporary settings + module_controller.temporary_settings_controller.set('session', session) + + while True: + # Check for HiFi subscription + try: + session.check_subscription() + break + except TidalAuthError as e: + print(f'{e}') + confirm = input('Do you want to create a new session? [Y/n]: ') + + if confirm.upper() == 'N': + print('Exiting...') + exit() + + # Create a new session finally + session = TidalTvSession(settings['client_token'], settings['client_secret']) + module_controller.temporary_settings_controller.set('session', session) + + self.session = TidalApi(session) + # Track cache for credits + self.track_cache = {} + + @staticmethod + def generate_artwork_url(album_id, size=1280): + return 'https://resources.tidal.com/images/{0}/{1}x{1}.jpg'.format(album_id.replace('-', '/'), size) + + @staticmethod + def custom_url_parse(link: str): + if link.startswith('http'): + link = re.sub(r'tidal.com\/.{2}\/store\/', 'tidal.com/', link) + link = re.sub(r'tidal.com\/store\/', 'tidal.com/', link) + link = re.sub(r'tidal.com\/browse\/', 'tidal.com/', link) + url = urlparse(link) + components = url.path.split('/') + + if not components or len(components) <= 2: + print('Invalid URL: ' + link) + exit() + if len(components) == 5: + type_ = components[3] + id_ = components[4] + else: + type_ = components[1] + id_ = components[2] + return DownloadTypeEnum[type_], id_ + + def search(self, query_type: DownloadTypeEnum, query: str, tags: Tags = None, limit: int = 10): + results = self.session.get_search_data(query) + + items = [] + for i in results[query_type.name + 's']['items']: + if query_type is DownloadTypeEnum.artist: + name = i['name'] + artists = None + elif query_type is DownloadTypeEnum.playlist: + name = i['title'] + artists = [i['creator']['name']] + elif query_type is DownloadTypeEnum.track: + name = i['title'] + artists = [j['name'] for j in i['artists']] + elif query_type is DownloadTypeEnum.album: + name = i['title'] + artists = [j['name'] for j in i['artists']] + else: + raise Exception('Query type is invalid') + + additional = '' + if query_type != DownloadTypeEnum.artist: + if i['audioModes'] == ['DOLBY_ATMOS']: + additional = "Dolby Atmos" + elif i['audioModes'] == ['SONY_360RA']: + additional = "360 Reality Audio" + elif i['audioQuality'] == 'HI_RES': + additional = "MQA" + else: + additional = 'HiFi' + + item = SearchResult( + name=name, + artists=artists, + result_id=str(i['id']), + explicit=bool(i['explicit']) if 'explicit' in i else None, + additional=[additional] + ) + + items.append(item) + + return items + + def get_track_info(self, track_id: str) -> TrackInfo: + track_data = self.session.get_track(track_id) + + album_id = str(track_data['album']['id']) + album_data = self.session.get_album(album_id) + + cover_url = self.generate_artwork_url(track_data['album']['cover']) + + stream_data = self.session.get_stream_url(track_id, + QUALITY_PARSER[self.module_controller.orpheus_options.quality_tier]) + manifest = json.loads(base64.b64decode(stream_data['manifest'])) + + track_info = TrackInfo( + track_name=track_data['title'], + # track_id=track_id, + album_id=album_id, + album_name=album_data['title'], + artist_name=track_data['artist']['name'], + artist_id=track_data['artist']['id'], + # TODO: Get correct bit_depth and sample_rate + bit_depth=24 if manifest['codecs'] == 'mqa' else 16, + sample_rate=44.1, + download_type=DownloadEnum.URL, + cover_url=cover_url, + file_url=manifest['urls'][0], + tags=self.convert_tags(track_data, album_data), + codec=CodecEnum['AAC' if 'mp4a' in manifest['codecs'] else manifest['codecs'].upper()] + ) + + return track_info + + def get_lyrics_info(self, track_id: str) -> LyricsInfo: + embedded, synced = None, None + + lyrics_data = self.session.get_lyrics(track_id) + + if 'lyrics' in lyrics_data: + embedded = lyrics_data['lyrics'] + + if 'subtitles' in lyrics_data: + synced = lyrics_data['subtitles'] + + return LyricsInfo( + embedded=embedded, + synced=synced + ) + + def get_playlist_info(self, playlist_id: str) -> PlaylistInfo: + playlist_data = self.session.get_playlist(playlist_id) + playlist_tracks = self.session.get_playlist_items(playlist_id) + + tracks = [track['item']['id'] for track in playlist_tracks['items'] if track['type'] == 'track'] + + if 'name' in playlist_data['creator']: + creator_name = playlist_data['creator']['name'] + elif playlist_data['creator']['id'] == 0: + creator_name = 'TIDAL' + else: + creator_name = 'Unknown' + + cover_url = self.generate_artwork_url(playlist_data['squareImage'], size=1080) + + playlist_info = PlaylistInfo( + playlist_name=playlist_data['title'], + playlist_creator_name=creator_name, + playlist_creator_id=playlist_data['creator']['id'], + tracks=tracks, + cover_url=cover_url + ) + + return playlist_info + + def get_album_info(self, album_id): + album_data = self.session.get_album(album_id) + # Get all album tracks with corresponding credits + tracks_data = self.session.get_album_contributors(album_id) + + tracks = [str(track['item']['id']) for track in tracks_data['items']] + + # Cache all track (+credits) in track_cache + self.track_cache.update({str(track['item']['id']): track for track in tracks_data['items']}) + + album_info = AlbumInfo( + album_name=album_data['title'], + artist_name=album_data['artist']['name'], + artist_id=album_data['artist']['id'], + tracks=tracks, + ) + + return album_info + + def get_artist_info(self, artist_id: str) -> ArtistInfo: + artist_data = self.session.get_artist(artist_id) + + artist_albums = self.session.get_artist_albums(artist_id)['items'] + artist_singles = self.session.get_artist_albums_ep_singles(artist_id)['items'] + + albums = [str(album['id']) for album in artist_albums + artist_singles] + + artist_info = ArtistInfo( + artist_name=artist_data['name'], + albums=albums + ) + + return artist_info + + def get_track_credits(self, track_id: str) -> Optional[list]: + credits_dict = {} + + # Fetch credits from cache if not fetch those credits + if track_id in self.track_cache: + track_contributors = self.track_cache[track_id]['credits'] + + for contributor in track_contributors: + credits_dict[contributor['type']] = [c['name'] for c in contributor['contributors']] + else: + track_contributors = self.session.get_track_contributors(track_id)['items'] + + if len(track_contributors) > 0: + for contributor in track_contributors: + # Check if the dict contains no list, create one + if contributor['role'] not in credits_dict: + credits_dict[contributor['role']] = [] + + credits_dict[contributor['role']].append(contributor['name']) + + if len(credits_dict) > 0: + # Convert the dictionary back to a list of CreditsInfo + return [CreditsInfo(sanitise_name(k), v) for k, v in credits_dict.items()] + return None + + @staticmethod + def convert_tags(track_data: dict, album_data: dict) -> Tags: + release_year = track_data['streamStartDate'][:4] + + tags = Tags( + title=track_data['title'], + album=album_data['title'], + album_artist=album_data['artist']['name'], + artist=track_data['artist']['name'], + track_number=track_data['trackNumber'], + total_tracks=album_data['numberOfTracks'], + disc_number=track_data['volumeNumber'], + total_discs=album_data['numberOfVolumes'], + date=release_year, + explicit=track_data['explicit'], + isrc=track_data['isrc'], + copyright=track_data['copyright'], + replay_gain=track_data['replayGain'], + replay_peak=track_data['peak'] + ) + + return tags diff --git a/modules/tidal/tidal_api.py b/modules/tidal/tidal_api.py new file mode 100644 index 0000000..7aa8bd9 --- /dev/null +++ b/modules/tidal/tidal_api.py @@ -0,0 +1,407 @@ +import base64 +import json +import sys +import time +import webbrowser +from datetime import datetime, timedelta + +import requests +import urllib3 + +from utils.utils import create_requests_session + +technical_names = { + 'eac3': 'E-AC-3 JOC (Dolby Digital Plus with Dolby Atmos, with 5.1 bed)', + 'mha1': 'MPEG-H 3D Audio (Sony 360 Reality Audio)', + 'ac4': 'AC-4 IMS (Dolby AC-4 with Dolby Atmos immersive stereo)', + 'mqa': 'MQA (Master Quality Authenticated) in FLAC container', + 'flac': 'FLAC (Free Lossless Audio Codec)', + 'alac': 'ALAC (Apple Lossless Audio Codec)', + 'mp4a.40.2': 'AAC 320 (Advanced Audio Coding) with a bitrate of 320kb/s', + 'mp4a.40.5': 'AAC 96 (Advanced Audio Coding) with a bitrate of 96kb/s' +} + + +class TidalRequestError(Exception): + def __init__(self, payload): + sf = '{subStatus}: {userMessage} (HTTP {status})'.format(**payload) + self.payload = payload + super(TidalRequestError, self).__init__(sf) + + +class TidalAuthError(Exception): + def __init__(self, message): + super(TidalAuthError, self).__init__(message) + + +class TidalError(Exception): + def __init__(self, message): + self.message = message + super(TidalError, self).__init__(message) + + +class TidalApi(object): + TIDAL_API_BASE = 'https://api.tidal.com/v1/' + TIDAL_VIDEO_BASE = 'https://api.tidalhifi.com/v1/' + TIDAL_CLIENT_VERSION = '2.26.1' + + def __init__(self, session): + self.session = session + self.s = create_requests_session() + + def _get(self, url, params=None, refresh=False): + if params is None: + params = {} + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + params['countryCode'] = self.session.country_code + if 'limit' not in params: + params['limit'] = '9999' + + # Catch video for different base + if url[:5] == 'video': + resp = self.s.get( + self.TIDAL_VIDEO_BASE + url, + headers=self.session.auth_headers(), + params=params, + verify=False) + else: + resp = self.s.get( + self.TIDAL_API_BASE + url, + headers=self.session.auth_headers(), + params=params, + verify=False) + + # if the request 401s or 403s, try refreshing the TV/Mobile session in case that helps + if not refresh and (resp.status_code == 401 or resp.status_code == 403): + if isinstance(self.session, TidalTvSession): + self.session.refresh() + return self._get(url, params, True) + + resp_json = None + try: + resp_json = resp.json() + except: # some tracks seem to return a JSON with leading whitespace + try: + resp_json = json.loads(resp.text.strip()) + except: # if this doesn't work, the HTTP status probably isn't 200. Are we rate limited? + pass + + if not resp_json: + raise TidalError('Response was not valid JSON. HTTP status {}. {}'.format(resp.status_code, resp.text)) + + if 'status' in resp_json and resp_json['status'] == 404 and \ + 'subStatus' in resp_json and resp_json['subStatus'] == 2001: + raise TidalError('Error: {}. This might be region-locked.'.format(resp_json['userMessage'])) + + # Really hacky way, pls don't copy this ever + if 'status' in resp_json and resp_json['status'] == 404 and \ + 'error' in resp_json and resp_json['error'] == 'Not Found': + return resp_json + + if 'status' in resp_json and not resp_json['status'] == 200: + raise TidalRequestError(resp_json) + + return resp_json + + def get_stream_url(self, track_id, quality): + + return self._get('tracks/' + str(track_id) + '/playbackinfopostpaywall', { + 'playbackmode': 'STREAM', + 'assetpresentation': 'FULL', + 'audioquality': quality, + 'prefetch': 'false' + }) + + def get_search_data(self, searchterm): + return self._get('search', + params={'query': str(searchterm), 'offset': 0, 'limit': 20, 'includeContributors': 'true'}) + + def get_page(self, pageurl): + return self._get('pages/' + pageurl, params={'deviceType': 'TV', 'locale': 'en_US', 'mediaFormats': 'SONY_360'}) + + def get_playlist_items(self, playlist_id): + result = self._get('playlists/' + playlist_id + '/items', { + 'offset': 0, + 'limit': 100 + }) + + if result['totalNumberOfItems'] <= 100: + return result + + offset = len(result['items']) + while True: + buf = self._get('playlists/' + playlist_id + '/items', { + 'offset': offset, + 'limit': 100 + }) + offset += len(buf['items']) + result['items'] += buf['items'] + + if offset >= result['totalNumberOfItems']: + break + + return result + + def get_playlist(self, playlist_id): + return self._get('playlists/' + str(playlist_id)) + + def get_album_tracks(self, album_id): + return self._get('albums/' + str(album_id) + '/tracks') + + def get_track(self, track_id): + return self._get('tracks/' + str(track_id)) + + def get_album(self, album_id): + return self._get('albums/' + str(album_id)) + + def get_video(self, video_id): + return self._get('videos/' + str(video_id)) + + def get_favorite_tracks(self, user_id): + return self._get('users/' + str(user_id) + '/favorites/tracks') + + def get_track_contributors(self, track_id): + return self._get('tracks/' + str(track_id) + '/contributors') + + def get_album_contributors(self, album_id): + return self._get('albums/' + album_id + '/items/credits', params={ + 'replace': True, + 'offset': 0, + 'limit': 100, + 'includeContributors': True + }) + + def get_lyrics(self, track_id): + return self._get('tracks/' + str(track_id) + '/lyrics', params={ + 'deviceType': 'TV', + 'locale': 'en_US' + }) + + def get_video_contributors(self, video_id): + return self._get('videos/' + video_id + '/contributors', params={ + 'limit': 50 + }) + + def get_video_stream_url(self, video_id): + return self._get('videos/' + str(video_id) + '/streamurl') + + def get_artist(self, artist_id): + return self._get('artists/' + str(artist_id)) + + def get_artist_albums(self, artist_id): + return self._get('artists/' + str(artist_id) + '/albums') + + def get_artist_albums_ep_singles(self, artist_id): + return self._get('artists/' + str(artist_id) + '/albums', params={'filter': 'EPSANDSINGLES'}) + + def get_type_from_id(self, id): + result = None + try: + result = self.get_album(id) + return 'a' + except TidalError: + pass + try: + result = self.get_artist(id) + return 'r' + except TidalError: + pass + try: + result = self.get_track(id) + return 't' + except TidalError: + pass + try: + result = self.get_video(id) + return 'v' + except TidalError: + pass + + return result + + +class SessionFormats: + def __init__(self, session): + self.mqa_trackid = '91950969' + self.dolby_trackid = '131069353' + self.sony_trackid = '142292058' + + self.quality = ['HI_RES', 'LOSSLESS', 'HIGH', 'LOW'] + + self.formats = { + 'eac3': False, + 'mha1': False, + 'ac4': False, + 'mqa': False, + 'flac': False, + 'alac': False, + 'mp4a.40.2': False, + 'mp4a.40.5': False + } + + try: + self.check_formats(session) + except TidalRequestError: + print('\tERROR: No (HiFi) subscription found!') + + def check_formats(self, session): + api = TidalApi(session) + + for id in [self.dolby_trackid, self.sony_trackid]: + playback_info = api.get_stream_url(id, ['LOW']) + if playback_info['manifestMimeType'] == 'application/dash+xml': + continue + manifest_unparsed = base64.b64decode(playback_info['manifest']).decode('UTF-8') + if 'ContentProtection' not in manifest_unparsed: + self.formats[json.loads(manifest_unparsed)['codecs']] = True + + for i in range(len(self.quality)): + playback_info = api.get_stream_url(self.mqa_trackid, [self.quality[i]]) + if playback_info['manifestMimeType'] == 'application/dash+xml': + continue + + manifest_unparsed = base64.b64decode(playback_info['manifest']).decode('UTF-8') + if 'ContentProtection' not in manifest_unparsed: + self.formats[json.loads(manifest_unparsed)['codecs']] = True + + def print_fomats(self): + table = prettytable.PrettyTable() + table.field_names = ['Codec', 'Technical name', 'Supported'] + table.align = 'l' + for format in self.formats: + table.add_row([format, technical_names[format], self.formats[format]]) + + string_table = '\t' + table.__str__().replace('\n', '\n\t') + print(string_table) + print('') + + +class TidalTvSession: + ''' + Tidal session object based on the mobile Android oauth flow + ''' + + def __init__(self, client_token, client_secret): + self.TIDAL_AUTH_BASE = 'https://auth.tidal.com/v1/' + + self.username = None + self.client_id = client_token + self.client_secret = client_secret + + self.device_code = None + self.user_code = None + + self.access_token = None + self.refresh_token = None + self.expires = None + self.user_id = None + self.country_code = None + + self.auth() + + def auth(self): + s = requests.Session() + + # retrieve csrf token for subsequent request + r = s.post(self.TIDAL_AUTH_BASE + 'oauth2/device_authorization', data={ + 'client_id': self.client_id, + 'scope': 'r_usr w_usr' + }, verify=False) + + if r.status_code == 400: + raise TidalAuthError("Authorization failed! Is the clientid/token up to date?") + elif r.status_code == 200: + self.device_code = r.json()['deviceCode'] + self.user_code = r.json()['userCode'] + print('Opening https://link.tidal.com/{}, log in or sign up to TIDAL.'.format(self.user_code)) + webbrowser.open('https://link.tidal.com/' + self.user_code, new=2) + + data = { + 'client_id': self.client_id, + 'device_code': self.device_code, + 'client_secret': self.client_secret, + 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', + 'scope': 'r_usr w_usr' + } + + status_code = 400 + print('Checking link ', end='') + + while status_code == 400: + for index, char in enumerate("." * 5): + sys.stdout.write(char) + sys.stdout.flush() + # exchange access code for oauth token + time.sleep(0.2) + r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data=data, verify=False) + status_code = r.status_code + index += 1 # lists are zero indexed, we need to increase by one for the accurate count + # backtrack the written characters, overwrite them with space, backtrack again: + sys.stdout.write("\b" * index + " " * index + "\b" * index) + sys.stdout.flush() + + if r.status_code == 200: + print('\nSuccessfully linked!') + elif r.status_code == 401: + raise TidalAuthError('Auth Error: ' + r.json()['error']) + + self.access_token = r.json()['access_token'] + self.refresh_token = r.json()['refresh_token'] + self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) + + r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers(), verify=False) + assert (r.status_code == 200) + self.user_id = r.json()['userId'] + self.country_code = r.json()['countryCode'] + + r = requests.get('https://api.tidal.com/v1/users/{}?countryCode={}'.format(self.user_id, self.country_code), + headers=self.auth_headers(), verify=False) + assert (r.status_code == 200) + self.username = r.json()['username'] + + def check_subscription(self): + if self.access_token is not None: + r = requests.get('https://api.tidal.com/v1/users/' + str(self.user_id) + '/subscription', + headers=self.auth_headers(), verify=False) + assert (r.status_code == 200) + if r.json()['subscription']['type'] not in ['HIFI', 'PREMIUM_PLUS']: + raise TidalAuthError('You need a HiFi subscription!') + + def valid(self): + if self.access_token is None or datetime.now() > self.expires: + return False + r = requests.get('https://api.tidal.com/v1/sessions', headers=self.auth_headers(), verify=False) + return r.status_code == 200 + + def refresh(self): + assert (self.refresh_token is not None) + r = requests.post(self.TIDAL_AUTH_BASE + 'oauth2/token', data={ + 'refresh_token': self.refresh_token, + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'grant_type': 'refresh_token' + }, verify=False) + + if r.status_code == 200: + print('Tidal: Refreshing token successful') + self.access_token = r.json()['access_token'] + self.expires = datetime.now() + timedelta(seconds=r.json()['expires_in']) + + if 'refresh_token' in r.json(): + self.refresh_token = r.json()['refresh_token'] + + return r.status_code == 200 + + @staticmethod + def session_type(): + return 'Tv' + + def auth_headers(self): + return { + 'Host': 'api.tidal.com', + 'X-Tidal-Token': self.client_id, + 'Authorization': 'Bearer {}'.format(self.access_token), + 'Connection': 'Keep-Alive', + 'Accept-Encoding': 'gzip', + 'User-Agent': 'TIDAL_ANDROID/1039 okhttp/3.14.9' + }