Files
orpheusdl-tidal/interface.py

1164 lines
45 KiB
Python
Raw Permalink Normal View History

2021-08-28 15:31:36 +02:00
import base64
import json
import logging
2021-11-19 17:14:40 +01:00
import re
2022-03-02 01:22:58 +01:00
import ffmpeg
2022-02-02 00:27:28 +01:00
from datetime import datetime
from getpass import getpass
from dataclasses import dataclass
2021-11-19 17:14:40 +01:00
from shutil import copyfileobj
from xml.etree import ElementTree
from tqdm import tqdm
2021-08-28 15:31:36 +02:00
from utils.exceptions import InvalidInput
2021-08-28 15:31:36 +02:00
from utils.models import *
from utils.utils import (
sanitise_name,
silentremove,
download_to_temp,
create_temp_filename,
create_requests_session,
)
from .mqa_identifier_python.mqa_identifier_python.mqa_identifier import MqaIdentifier
from .tidal_api import (
TidalTvSession,
TidalApi,
TidalMobileSession,
SessionType,
TidalError,
TidalRequestError,
)
2021-08-28 15:31:36 +02:00
module_information = ModuleInformation(
service_name="TIDAL",
module_supported_modes=ModuleModes.download
| ModuleModes.credits
| ModuleModes.covers
| ModuleModes.lyrics,
2021-10-12 15:07:04 +02:00
login_behaviour=ManualEnum.manual,
global_settings={
2025-12-15 00:13:28 +01:00
"tv_atmos_token": "cgiF7TQuB97BUIu3",
"tv_atmos_secret": "1nqpgx8uvBdZigrx4hUPDV2hOwgYAAAG5DYXOr6uNf8=",
"mobile_atmos_hires_token": "km8T1xS355y7dd3H",
"mobile_hires_token": "6BDSRdpK9hqEBTgU",
"enable_mobile": True,
"prefer_ac4": False,
"fix_mqa": False,
'tidal_token': ''
},
# currently too broken to keep it, cover needs to be jpg else crash, problems on termux due to pillow
# flags=ModuleFlags.needs_cover_resize,
session_storage_variables=["sessions"],
netlocation_constant="tidal",
test_url="https://tidal.com/browse/track/92265335",
url_decoding=ManualEnum.manual,
2021-08-28 15:31:36 +02:00
)
2021-11-19 17:14:40 +01:00
@dataclass
class AudioTrack:
codec: CodecEnum
2021-11-19 17:14:40 +01:00
sample_rate: int
bitrate: int
urls: list
2021-11-19 17:14:40 +01:00
2021-08-28 15:31:36 +02:00
class ModuleInterface:
# noinspection PyTypeChecker
2021-08-28 15:31:36 +02:00
def __init__(self, module_controller: ModuleController):
self.cover_size = (
module_controller.orpheus_options.default_cover_options.resolution
)
self.oprinter = module_controller.printer_controller
self.print = module_controller.printer_controller.oprint
self.disable_subscription_check = (
module_controller.orpheus_options.disable_subscription_check
)
2022-02-02 00:27:28 +01:00
self.settings = module_controller.module_settings
2021-08-28 15:31:36 +02:00
# LOW = 96kbit/s AAC, HIGH = 320kbit/s AAC, LOSSLESS = 44.1/16 FLAC, HI_RES <= 48/24 FLAC with MQA
self.quality_parse = {
QualityEnum.MINIMUM: "LOW",
QualityEnum.LOW: "LOW",
QualityEnum.MEDIUM: "HIGH",
QualityEnum.HIGH: "HIGH",
QualityEnum.LOSSLESS: "LOSSLESS",
QualityEnum.HIFI: "HI_RES",
}
# save all the TidalSession objects
sessions = {}
self.available_sessions = [
SessionType.TV.name,
SessionType.MOBILE_DEFAULT.name,
SessionType.MOBILE_ATMOS.name,
]
# load all saved sessions (TV, Mobile Atmos, Mobile Default)
saved_sessions = module_controller.temporary_settings_controller.read(
"sessions"
)
2025-05-16 21:01:35 -03:00
tidal_token = module_controller.module_settings.get('tidal_token')
if not saved_sessions:
saved_sessions = {}
if not self.settings["enable_mobile"]:
self.available_sessions = [SessionType.TV.name]
2021-08-28 15:31:36 +02:00
while True:
login_session = None
2025-05-17 19:14:35 +02:00
relogin_requested = False
2025-05-16 21:01:35 -03:00
def auth_and_save_session(session, session_type, tidal_token=None):
session = self.auth_session(session, session_type, login_session, tidal_token)
# get the dict representation from the TidalSession object and save it into saved_session/loginstorage
saved_sessions[session_type] = session.get_storage()
module_controller.temporary_settings_controller.set(
"sessions", saved_sessions
)
return session
# ask for login if there are no saved sessions
if not saved_sessions:
login_session_type = None
if len(self.available_sessions) == 1:
login_session_type = self.available_sessions[0]
else:
2023-11-24 00:09:21 +01:00
self.print(
f"{module_information.service_name}: Choose a login method:"
)
self.print(f"{module_information.service_name}: 1. TV (browser)")
self.print(
f"{module_information.service_name}: 2. Mobile (username and password, choose TV if this doesn't work)"
)
while not login_session_type:
input_str = input(" Login method: ")
try:
login_session_type = {
"1": SessionType.TV.name,
"tv": SessionType.TV.name,
"2": SessionType.MOBILE_DEFAULT.name,
"mobile": SessionType.MOBILE_DEFAULT.name,
}[input_str.lower()]
except KeyError:
self.print(f'{module_information.service_name}: Invalid choice, try again')
# auto use tidal_token if present in config.json else prompt it
while True:
reply = input(' Use a tidal_token instead of email/password? (Y/n): ').strip().lower()
if reply in ("y", "yes"):
if not tidal_token:
tidal_token = input(' No stored tidal_token detected, paste it here: ').strip()
break
elif reply in ("n", "no"):
tidal_token = None
break
else:
self.print(f'{module_information.service_name}: Reply with (Y)es or (N)o')
2025-05-16 21:01:35 -03:00
login_session = auth_and_save_session(
self.init_session(login_session_type), login_session_type, tidal_token
)
self.print(f"Account Country: {login_session.country_code}")
for session_type in self.available_sessions:
sessions[session_type] = self.init_session(session_type)
if session_type in saved_sessions:
logging.debug(
f"{module_information.service_name}: {session_type} session found, loading"
)
# load the dictionary from the temporary_settings_controller inside the TidalSession class
sessions[session_type].set_storage(saved_sessions[session_type])
else:
2023-11-24 00:09:21 +01:00
logging.debug(
f"{module_information.service_name}: No {session_type} session found, creating new one"
)
sessions[session_type] = auth_and_save_session(
sessions[session_type], session_type
)
# always try to refresh session
if not sessions[session_type].valid():
sessions[session_type].refresh()
# Save the refreshed session in the temporary settings
saved_sessions[session_type] = sessions[session_type].get_storage()
module_controller.temporary_settings_controller.set(
"sessions", saved_sessions
)
# check for a valid subscription
subscription = self.check_subscription(
sessions[session_type].get_subscription()
)
if not subscription:
confirm = input(" Do you want to relogin? [Y/n]: ")
if confirm.upper() == "N":
self.print("Exiting...")
exit()
# reset saved sessions and loop back to login
saved_sessions = {}
2025-05-17 19:14:35 +02:00
# flag used to not ask 2 times in a row to relogin
relogin_requested = True
break
if not login_session:
login_session = sessions[session_type]
# check if tidal_token in config.json matches any saved refresh_token to swap between them easily
if tidal_token and not relogin_requested and tidal_token not in [v.get('refresh_token') for v in saved_sessions.values()]:
confirm = input(' Stored tidal_token differs from setting.json - Do you want to relogin? [Y/n]: ').strip().lower()
if confirm in ('y', 'yes'):
saved_sessions = {}
continue # Restart the while loop to re-authenticate
if saved_sessions:
# print available stored sessions and their country code
country_codes = set(entry['country_code'] for entry in saved_sessions.values())
sessions_list = ' - '.join(saved_sessions.keys())
if len(country_codes) == 1:
result = f"Using saved sessions: {sessions_list} ({country_codes.pop()})"
else:
result = "Using saved sessions: " + " - ".join(f"{key} ({entry['country_code']})" for key, entry in saved_sessions.items())
self.print(result)
break
# only needed for region locked albums where the track is available but force_album_format is used
self.album_cache = {}
# load the Tidal session with all saved sessions (TV, Mobile Atmos, Mobile Default)
2021-10-15 23:33:47 +02:00
self.session: TidalApi = TidalApi(sessions)
2021-10-12 15:07:04 +02:00
def init_session(self, session_type):
session = None
# initialize session with the needed API keys
if session_type == SessionType.TV.name:
session = TidalTvSession(
self.settings["tv_atmos_token"], self.settings["tv_atmos_secret"]
)
elif session_type == SessionType.MOBILE_ATMOS.name:
session = TidalMobileSession(self.settings["mobile_atmos_hires_token"])
else:
session = TidalMobileSession(self.settings["mobile_hires_token"])
return session
2025-05-16 21:01:35 -03:00
def auth_session(self, session, session_type, login_session, tidal_token=None):
if tidal_token:
session.refresh_token = tidal_token
session.refresh()
elif login_session:
# refresh tokens can be used with any client id
# this can be used to switch to any client type from an existing session
session.refresh_token = login_session.refresh_token
session.user_id = login_session.user_id
session.country_code = login_session.country_code
session.refresh()
elif session_type == SessionType.TV.name:
self.print(f"{module_information.service_name}: Creating a TV session")
session.auth()
else:
self.print(f"{module_information.service_name}: Creating a Mobile session")
self.print(
f"{module_information.service_name}: Enter your TIDAL username and password:"
)
self.print(
f"{module_information.service_name}: (password will not be echoed)"
)
username = input(" Username: ")
password = getpass(" Password: ")
session.auth(username, password)
self.print(f"Successfully logged in, using {session_type} token!")
return session
def check_subscription(self, subscription: str) -> bool:
2024-02-20 13:35:48 +01:00
# returns true if "disable_subscription_checks" is enabled or subscription is HIFI (Plus)
if not self.disable_subscription_check and subscription not in {
"HIFI",
"PREMIUM",
"PREMIUM_PLUS",
}:
self.print(
f"{module_information.service_name}: Account does not have a HiFi (Plus) subscription, "
f"detected subscription: {subscription}"
)
return False
return True
@staticmethod
2022-10-13 18:10:14 +02:00
def _generate_artwork_url(cover_id: str, size: int, max_size: int = 1280):
# not the best idea, but it rounds the self.cover_size to the nearest number in supported_sizes, 1281 is needed
# for the "uncompressed" cover
supported_sizes = [80, 160, 320, 480, 640, 1080, 1280, 1281]
best_size = min(supported_sizes, key=lambda x: abs(x - size))
# only supports 80x80, 160x160, 320x320, 480x480, 640x640, 1080x1080 and 1280x1280 only for non playlists
# return "uncompressed" cover if self.cover_resolution > max_size
image_name = (
"{0}x{0}.jpg".format(best_size) if best_size <= max_size else "origin.jpg"
)
return f'https://resources.tidal.com/images/{cover_id.replace("-", "/")}/{image_name}'
2021-09-17 18:21:26 +02:00
@staticmethod
2022-10-13 18:10:14 +02:00
def _generate_animated_artwork_url(cover_id: str, size=1280):
return "https://resources.tidal.com/videos/{0}/{1}x{1}.mp4".format(
cover_id.replace("-", "/"), size
)
2021-08-28 15:31:36 +02:00
def custom_url_parse(self, link: str):
# the most beautiful regex ever written
match = re.search(
r"https?://tidal\.com/(?:browse/)?(?P<media_type>track|album|playlist|artist)/"
r"(?P<media_id>[A-Za-z0-9-]+)",
link,
)
# so parse the regex "match" to the actual DownloadTypeEnum
media_types = {
"track": DownloadTypeEnum.track,
"album": DownloadTypeEnum.album,
"artist": DownloadTypeEnum.artist,
"playlist": DownloadTypeEnum.playlist,
}
if not match:
self.print(f"Unsupported URL: {link}")
exit()
2021-08-28 15:31:36 +02:00
return MediaIdentification(
media_type=media_types[match.group("media_type")],
media_id=match.group("media_id"),
)
def search(
self,
query_type: DownloadTypeEnum,
query: str,
track_info: TrackInfo = None,
limit: int = 20,
):
2023-11-20 20:50:46 +01:00
if track_info and track_info.tags.isrc:
results = self.session.get_tracks_by_isrc(track_info.tags.isrc)
else:
results = self.session.get_search_data(query, limit=limit)[
query_type.name + "s"
]
2021-08-28 15:31:36 +02:00
items = []
for i in results.get("items"):
2023-01-16 14:25:58 +01:00
duration, name = None, None
2021-08-28 15:31:36 +02:00
if query_type is DownloadTypeEnum.artist:
name = i.get("name")
2021-08-28 15:31:36 +02:00
artists = None
2021-09-10 03:07:11 +02:00
year = None
2021-08-28 15:31:36 +02:00
elif query_type is DownloadTypeEnum.playlist:
if "name" in i.get("creator"):
artists = [i.get("creator").get("name")]
elif i.get("type") == "EDITORIAL":
2022-05-02 19:11:32 +02:00
artists = [module_information.service_name]
else:
artists = ["Unknown"]
duration = i.get("duration")
# TODO: Use playlist creation date or lastUpdated?
year = i.get("created")[:4]
2021-08-28 15:31:36 +02:00
elif query_type is DownloadTypeEnum.track:
artists = [j.get("name") for j in i.get("artists")]
2021-09-10 03:07:11 +02:00
# Getting the year from the album?
year = (
i.get("album").get("releaseDate")[:4]
if i.get("album").get("releaseDate")
else None
)
duration = i.get("duration")
2021-08-28 15:31:36 +02:00
elif query_type is DownloadTypeEnum.album:
artists = [j.get("name") for j in i.get("artists")]
duration = i.get("duration")
year = i.get("releaseDate")[:4]
2021-08-28 15:31:36 +02:00
else:
raise Exception("Query type is invalid")
2021-08-28 15:31:36 +02:00
if query_type is not DownloadTypeEnum.artist:
name = i.get("title")
name += f' ({i.get("version")})' if i.get("version") else ""
2021-09-05 22:17:01 +02:00
additional = None
if query_type not in {DownloadTypeEnum.artist, DownloadTypeEnum.playlist}:
if "DOLBY_ATMOS" in i.get("audioModes"):
2021-08-28 15:31:36 +02:00
additional = "Dolby Atmos"
elif "SONY_360RA" in i.get("audioModes"):
2021-08-28 15:31:36 +02:00
additional = "360 Reality Audio"
elif i.get("audioQuality") == "HI_RES":
2021-08-28 15:31:36 +02:00
additional = "MQA"
else:
additional = "HiFi"
2021-08-28 15:31:36 +02:00
item = SearchResult(
name=name,
artists=artists,
2021-09-10 03:07:11 +02:00
year=year,
result_id=(
str(i.get("id"))
if query_type is not DownloadTypeEnum.playlist
else i.get("uuid")
),
explicit=i.get("explicit"),
duration=duration,
additional=[additional] if additional else None,
2021-08-28 15:31:36 +02:00
)
items.append(item)
return items
2022-01-20 21:27:00 +01:00
def get_playlist_info(self, playlist_id: str) -> PlaylistInfo:
playlist_data = self.session.get_playlist(playlist_id)
playlist_tracks = self.session.get_playlist_items(playlist_id)
2021-08-28 15:31:36 +02:00
tracks = [
track.get("item").get("id")
for track in playlist_tracks.get("items")
if track.get("type") == "track"
]
if "name" in playlist_data.get("creator"):
creator_name = playlist_data.get("creator").get("name")
elif playlist_data.get("type") == "EDITORIAL":
2022-05-02 19:11:32 +02:00
creator_name = module_information.service_name
else:
creator_name = "Unknown"
2022-01-20 21:27:00 +01:00
if playlist_data.get("squareImage"):
cover_url = self._generate_artwork_url(
playlist_data["squareImage"], size=self.cover_size, max_size=1080
)
cover_type = ImageFileTypeEnum.jpg
else:
# fallback to defaultPlaylistImage
cover_url = "https://tidal.com/browse/assets/images/defaultImages/defaultPlaylistImage.png"
cover_type = ImageFileTypeEnum.png
2022-01-20 21:27:00 +01:00
return PlaylistInfo(
name=playlist_data.get("title"),
2022-01-20 21:27:00 +01:00
creator=creator_name,
tracks=tracks,
release_year=playlist_data.get("created")[:4],
duration=playlist_data.get("duration"),
creator_id=playlist_data["creator"].get("id"),
cover_url=cover_url,
cover_type=cover_type,
2022-02-06 02:50:05 +01:00
track_extra_kwargs={
"data": {
track.get("item").get("id"): track.get("item")
for track in playlist_tracks.get("items")
}
},
2022-01-20 21:27:00 +01:00
)
def get_artist_info(self, artist_id: str, get_credited_albums: bool) -> ArtistInfo:
artist_data = self.session.get_artist(artist_id)
artist_albums = self.session.get_artist_albums(artist_id).get("items")
artist_singles = self.session.get_artist_albums_ep_singles(artist_id).get(
"items"
)
2022-01-20 21:27:00 +01:00
# Only works with a mobile session, annoying, never do this again
credit_albums = []
if (
get_credited_albums
and SessionType.MOBILE_DEFAULT.name in self.available_sessions
):
self.session.default = SessionType.MOBILE_DEFAULT
credited_albums_page = self.session.get_page(
"contributor", params={"artistId": artist_id}
)
2022-01-20 21:27:00 +01:00
# This is so retarded
page_list = credited_albums_page["rows"][-1]["modules"][0].get("pagedList")
if page_list:
total_items = page_list["totalNumberOfItems"]
more_items_link = page_list["dataApiPath"][6:]
# Now fetch all the found total_items
items = []
for offset in range(0, total_items // 50 + 1):
print(f"Fetching {offset * 50}/{total_items}", end="\r")
items += self.session.get_page(
more_items_link, params={"limit": 50, "offset": offset * 50}
)["items"]
credit_albums = [item.get("item").get("album") for item in items]
self.session.default = SessionType.TV
2022-01-20 21:27:00 +01:00
# use set to filter out duplicate album ids
albums = {
str(album.get("id"))
for album in artist_albums + artist_singles + credit_albums
}
2022-01-20 21:27:00 +01:00
return ArtistInfo(
name=artist_data.get("name"),
albums=list(albums),
album_extra_kwargs={
"data": {
str(album.get("id")): album
for album in artist_albums + artist_singles
}
},
2022-01-20 21:27:00 +01:00
)
def get_album_info(self, album_id: str, data=None) -> AlbumInfo:
# check if album is already in album cache, add it
if data is None:
data = {}
if data.get(album_id):
album_data = data[album_id]
elif self.album_cache.get(album_id):
album_data = self.album_cache[album_id]
else:
album_data = self.session.get_album(album_id)
2022-01-20 21:27:00 +01:00
# get all album tracks with corresponding credits with a limit of 100
limit = 100
cache = {"data": {}}
try:
tracks_data = self.session.get_album_contributors(album_id, limit=limit)
total_tracks = tracks_data.get("totalNumberOfItems")
# round total_tracks to the next 100 and loop over the offset, that's hideous
for offset in range(limit, ((total_tracks // limit) + 1) * limit, limit):
# fetch the new album tracks with the given offset
track_items = self.session.get_album_contributors(
album_id, offset=offset, limit=limit
)
# append those tracks to the album_data
tracks_data["items"] += track_items.get("items")
# add the track contributors to a new list called 'credits'
cache = {"data": {}}
for track in tracks_data.get("items"):
track.get("item").update({"credits": track.get("credits")})
cache.get("data")[str(track.get("item").get("id"))] = track.get("item")
# filter out video clips
tracks = [
str(track["item"]["id"])
for track in tracks_data.get("items")
if track.get("type") == "track"
]
except TidalError:
tracks = []
2022-01-20 21:27:00 +01:00
2022-02-02 16:16:38 +01:00
quality = None
if "audioModes" in album_data:
if album_data["audioModes"] == ["DOLBY_ATMOS"]:
quality = "Dolby Atmos"
elif album_data["audioModes"] == ["SONY_360RA"]:
quality = "360"
elif album_data["audioQuality"] == "HI_RES":
quality = "M"
2022-01-20 21:27:00 +01:00
2022-06-12 16:51:12 +02:00
release_year = None
if album_data.get("releaseDate"):
release_year = album_data.get("releaseDate")[:4]
elif album_data.get("streamStartDate"):
release_year = album_data.get("streamStartDate")[:4]
elif album_data.get("copyright"):
2022-06-12 16:51:12 +02:00
# assume that every copyright includes the year
release_year = [
int(s) for s in album_data.get("copyright").split() if s.isdigit()
]
2022-06-12 16:51:12 +02:00
if len(release_year) > 0:
release_year = release_year[0]
if album_data.get("cover"):
cover_url = self._generate_artwork_url(
album_data.get("cover"), size=self.cover_size
)
cover_type = ImageFileTypeEnum.jpg
else:
# fallback to defaultAlbumImage
cover_url = "https://tidal.com/browse/assets/images/defaultImages/defaultAlbumImage.png"
cover_type = ImageFileTypeEnum.png
2022-01-20 21:27:00 +01:00
return AlbumInfo(
name=album_data.get("title"),
2022-06-12 16:51:12 +02:00
release_year=release_year,
explicit=album_data.get("explicit"),
2022-01-20 21:27:00 +01:00
quality=quality,
upc=album_data.get("upc"),
duration=album_data.get("duration"),
cover_url=cover_url,
cover_type=cover_type,
animated_cover_url=(
self._generate_animated_artwork_url(album_data.get("videoCover"))
if album_data.get("videoCover")
else None
),
artist=album_data.get("artist").get("name"),
artist_id=album_data.get("artist").get("id"),
2022-01-20 21:27:00 +01:00
tracks=tracks,
track_extra_kwargs=cache,
2022-01-20 21:27:00 +01:00
)
def get_track_info(
self,
track_id: str,
quality_tier: QualityEnum,
codec_options: CodecOptions,
data=None,
) -> TrackInfo:
2022-01-20 21:27:00 +01:00
if data is None:
data = {}
track_data = (
data[track_id] if track_id in data else self.session.get_track(track_id)
)
2022-01-20 21:27:00 +01:00
album_id = str(track_data.get("album").get("id"))
2022-01-20 21:27:00 +01:00
# check if album is already in album cache, get it
try:
album_data = (
data[album_id] if album_id in data else self.session.get_album(album_id)
)
except TidalError as e:
# if an error occurs, catch it and set the album_data to an empty dict to catch it
self.print(
f"{module_information.service_name}: {e} Trying workaround ...",
drop_level=1,
)
album_data = track_data.get("album")
album_data.update(
{
"artist": track_data.get("artist"),
"numberOfVolumes": 1,
"audioQuality": "LOSSLESS",
"audioModes": ["STEREO"],
}
)
# add the region locked album to the cache in order to properly use it later (force_album_format)
self.album_cache = {album_id: album_data}
media_tags = track_data["mediaMetadata"]["tags"]
format = None
2024-01-04 13:34:38 +01:00
if codec_options.spatial_codecs:
if "SONY_360RA" in media_tags:
format = "360ra"
elif "DOLBY_ATMOS" in media_tags:
if self.settings["prefer_ac4"]:
format = "ac4"
2024-01-04 13:34:38 +01:00
else:
format = "ac3"
if (
"HIRES_LOSSLESS" in media_tags
and not format
and quality_tier is QualityEnum.HIFI
):
format = "flac_hires"
session = {
"flac_hires": SessionType.MOBILE_DEFAULT,
"360ra": SessionType.MOBILE_DEFAULT,
"ac4": SessionType.MOBILE_ATMOS,
"ac3": SessionType.TV,
# TV is used whenever possible to avoid MPEG-DASH, which slows downloading
None: SessionType.TV,
}[format]
if not format and "DOLBY_ATMOS" in media_tags:
# if atmos is available, we don't use the TV session here because that will get atmos everytime
# there are no tracks with both 360RA and atmos afaik,
# so this shouldn't be an issue for now
session = SessionType.MOBILE_DEFAULT
if session.name in self.available_sessions:
self.session.default = session
else:
format = None
2021-08-28 15:31:36 +02:00
2022-04-04 16:24:33 +02:00
# define all default values in case the stream_data is None (region locked)
audio_track, mqa_file, track_codec, bitrate, download_args, error = (
None,
None,
CodecEnum.FLAC,
None,
None,
None,
)
2022-04-04 16:24:33 +02:00
try:
stream_data = self.session.get_stream_url(
track_id,
(
self.quality_parse[quality_tier]
if format != "flac_hires"
else "HI_RES_LOSSLESS"
),
)
2022-04-04 16:24:33 +02:00
except TidalRequestError as e:
error = e
# definitely region locked
if "Asset is not ready for playback" in str(e):
error = f"Track [{track_id}] is not available in your region"
2022-04-04 16:24:33 +02:00
stream_data = None
if stream_data is not None:
if stream_data["manifestMimeType"] == "application/dash+xml":
manifest = base64.b64decode(stream_data["manifest"])
2022-04-04 16:24:33 +02:00
audio_track = self.parse_mpd(manifest)[0] # Only one AudioTrack?
track_codec = audio_track.codec
else:
manifest = json.loads(base64.b64decode(stream_data["manifest"]))
track_codec = CodecEnum[
(
"AAC"
if "mp4a" in manifest["codecs"]
else manifest["codecs"].upper()
)
]
2022-04-04 16:24:33 +02:00
if not codec_data[track_codec].spatial:
if (
not codec_options.proprietary_codecs
and codec_data[track_codec].proprietary
):
self.print(
f"Proprietary codecs are disabled, if you want to download {track_codec.name}, "
f'set "proprietary_codecs": true',
drop_level=1,
)
stream_data = self.session.get_stream_url(track_id, "LOSSLESS")
if stream_data["manifestMimeType"] == "application/dash+xml":
manifest = base64.b64decode(stream_data["manifest"])
audio_track = self.parse_mpd(manifest)[
0
] # Only one AudioTrack?
2022-04-04 16:24:33 +02:00
track_codec = audio_track.codec
else:
manifest = json.loads(base64.b64decode(stream_data["manifest"]))
track_codec = CodecEnum[
(
"AAC"
if "mp4a" in manifest["codecs"]
else manifest["codecs"].upper()
)
]
2022-04-04 16:24:33 +02:00
if audio_track:
download_args = {"audio_track": audio_track}
2022-04-04 16:24:33 +02:00
else:
# check if MQA
if track_codec is CodecEnum.MQA and self.settings["fix_mqa"]:
2022-04-04 16:24:33 +02:00
# download the first chunk of the flac file to analyze it
temp_file_path = self.download_temp_header(manifest["urls"][0])
2022-02-02 00:27:28 +01:00
2022-04-04 16:24:33 +02:00
# detect MQA file
mqa_file = MqaIdentifier(temp_file_path)
2022-02-02 00:27:28 +01:00
2022-04-04 16:24:33 +02:00
# add the file to download_args
download_args = {"file_url": manifest["urls"][0]}
2021-11-19 17:14:40 +01:00
2022-06-21 21:43:32 +02:00
# https://en.wikipedia.org/wiki/Audio_bit_depth#cite_ref-1
bit_depth = (
(
24
if stream_data and stream_data["audioQuality"] == "HI_RES_LOSSLESS"
else 16
)
if track_codec in {CodecEnum.FLAC, CodecEnum.ALAC}
else None
)
sample_rate = (
48
if track_codec in {CodecEnum.EAC3, CodecEnum.MHA1, CodecEnum.AC4}
else 44.1
)
2022-02-02 00:27:28 +01:00
2022-04-04 16:24:33 +02:00
if stream_data:
# fallback bitrate
bitrate = {
"LOW": 96,
"HIGH": 320,
"LOSSLESS": 1411,
"HI_RES": None,
"HI_RES_LOSSLESS": None,
}[stream_data["audioQuality"]]
# manually set bitrate for immersive formats
if stream_data["audioMode"] == "DOLBY_ATMOS":
2022-06-21 21:43:32 +02:00
# check if the Dolby Atmos format is E-AC-3 JOC or AC-4
if track_codec == CodecEnum.EAC3:
bitrate = 768
elif track_codec == CodecEnum.AC4:
bitrate = 256
elif stream_data["audioMode"] == "SONY_360RA":
bitrate = 667
# more precise bitrate tidal uses MPEG-DASH
if audio_track:
bitrate = audio_track.bitrate // 1000
if stream_data["audioQuality"] == "HI_RES_LOSSLESS":
sample_rate = audio_track.sample_rate / 1000
2022-02-02 00:27:28 +01:00
# now set everything for MQA
if mqa_file is not None and mqa_file.is_mqa:
bit_depth = mqa_file.bit_depth
sample_rate = mqa_file.get_original_sample_rate()
2021-11-19 17:14:40 +01:00
track_name = track_data.get("title")
track_name += (
f' ({track_data.get("version")})' if track_data.get("version") else ""
)
2022-04-04 16:24:33 +02:00
if track_data["album"].get("cover"):
cover_url = self._generate_artwork_url(
track_data["album"].get("cover"), size=self.cover_size
)
else:
# fallback to defaultTrackImage, no cover_type flag? Might crash in the future
cover_url = "https://tidal.com/browse/assets/images/defaultImages/defaultTrackImage.png"
2021-08-28 15:31:36 +02:00
track_info = TrackInfo(
2021-10-12 15:07:04 +02:00
name=track_name,
album=album_data.get("title"),
2021-08-28 15:31:36 +02:00
album_id=album_id,
artists=[a.get("name") for a in track_data.get("artists")],
artist_id=track_data["artist"].get("id"),
release_year=(
track_data.get("streamStartDate")[:4]
if track_data.get("streamStartDate")
else (
track_data.get("dateAdded")[:4]
if track_data.get("dateAdded")
else None
)
),
2022-02-02 00:27:28 +01:00
bit_depth=bit_depth,
sample_rate=sample_rate,
bitrate=bitrate,
duration=track_data.get("duration"),
cover_url=cover_url,
explicit=track_data.get("explicit"),
2022-02-02 00:27:28 +01:00
tags=self.convert_tags(track_data, album_data, mqa_file),
2021-10-12 15:07:04 +02:00
codec=track_codec,
download_extra_kwargs=download_args,
lyrics_extra_kwargs={"track_data": track_data},
2022-01-20 21:27:00 +01:00
# check if 'credits' are present (only from get_album_data)
credits_extra_kwargs={
"data": (
{track_id: track_data["credits"]} if "credits" in track_data else {}
)
},
2021-08-28 15:31:36 +02:00
)
if error is not None:
track_info.error = f"Error: {error}"
2021-08-28 15:31:36 +02:00
return track_info
@staticmethod
def download_temp_header(file_url: str, chunk_size: int = 32768) -> str:
# create flac temp_location
temp_location = create_temp_filename() + ".flac"
# create session and download the file to the temp_location
r_session = create_requests_session()
r = r_session.get(file_url, stream=True, verify=False)
with open(temp_location, "wb") as f:
# only download the first chunk_size bytes
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
break
return temp_location
2021-10-12 15:07:04 +02:00
@staticmethod
def parse_mpd(xml: bytes) -> list:
xml = xml.decode("UTF-8")
2021-11-19 17:14:40 +01:00
# Removes default namespace definition, don't do that!
xml = re.sub(r'xmlns="[^"]+"', "", xml, count=1)
2021-11-19 17:14:40 +01:00
root = ElementTree.fromstring(xml)
# List of AudioTracks
tracks = []
for period in root.findall("Period"):
for adaptation_set in period.findall("AdaptationSet"):
for rep in adaptation_set.findall("Representation"):
2021-11-19 17:14:40 +01:00
# Check if representation is audio
content_type = adaptation_set.get("contentType")
if content_type != "audio":
raise ValueError("Only supports audio MPDs!")
2021-11-19 17:14:40 +01:00
# Codec checks
codec = rep.get("codecs").upper()
if codec.startswith("MP4A"):
codec = "AAC"
2021-11-19 17:14:40 +01:00
# Segment template
seg_template = rep.find("SegmentTemplate")
2021-11-19 17:14:40 +01:00
# Add init file to track_urls
track_urls = [seg_template.get("initialization")]
start_number = int(seg_template.get("startNumber") or 1)
2021-11-19 17:14:40 +01:00
# https://dashif-documents.azurewebsites.net/Guidelines-TimingModel/master/Guidelines-TimingModel.html#addressing-explicit
# Also see example 9
seg_timeline = seg_template.find("SegmentTimeline")
2021-11-19 17:14:40 +01:00
if seg_timeline is not None:
seg_time_list = []
cur_time = 0
for s in seg_timeline.findall("S"):
2021-11-19 17:14:40 +01:00
# Media segments start time
if s.get("t"):
cur_time = int(s.get("t"))
2021-11-19 17:14:40 +01:00
# Segment reference
for i in range((int(s.get("r") or 0) + 1)):
2021-11-19 17:14:40 +01:00
seg_time_list.append(cur_time)
# Add duration to current time
cur_time += int(s.get("d"))
2021-11-19 17:14:40 +01:00
# Create list with $Number$ indices
seg_num_list = list(
range(start_number, len(seg_time_list) + start_number)
)
2021-11-19 17:14:40 +01:00
# Replace $Number$ with all the seg_num_list indices
track_urls += [
seg_template.get("media").replace("$Number$", str(n))
for n in seg_num_list
]
tracks.append(
AudioTrack(
codec=CodecEnum[codec],
sample_rate=int(rep.get("audioSamplingRate") or 0),
bitrate=int(rep.get("bandwidth") or 0),
urls=track_urls,
)
)
2021-11-19 17:14:40 +01:00
return tracks
def get_track_download(
self, file_url: str = None, audio_track: AudioTrack = None
) -> TrackDownloadInfo:
# only file_url or audio_track at a time
2022-02-02 00:27:28 +01:00
# MHA1, EC-3 or MQA
2021-11-19 17:14:40 +01:00
if file_url:
return TrackDownloadInfo(download_type=DownloadEnum.URL, file_url=file_url)
# MPEG-DASH
# use the total_file size for a better progress bar? Is it even possible to calculate the total size from MPD?
try:
columns = os.get_terminal_size().columns
if os.name == "nt":
bar = tqdm(
audio_track.urls,
ncols=(columns - self.oprinter.indent_number),
bar_format=" " * self.oprinter.indent_number
+ "{l_bar}{bar}{r_bar}",
)
else:
raise OSError
except OSError:
bar = tqdm(
audio_track.urls,
bar_format=" " * self.oprinter.indent_number + "{l_bar}{bar}{r_bar}",
)
# download all segments and save the locations inside temp_locations
temp_locations = []
for download_url in bar:
temp_locations.append(download_to_temp(download_url, extension="mp4"))
# needed for bar indent
bar.close()
# concatenated/Merged .mp4 file
merged_temp_location = create_temp_filename() + ".mp4"
# actual converted .flac file
output_location = (
create_temp_filename() + "." + codec_data[audio_track.codec].container.name
)
2021-11-19 17:14:40 +01:00
# download is finished, merge chunks into 1 file
with open(merged_temp_location, "wb") as dest_file:
2021-11-19 17:14:40 +01:00
for temp_location in temp_locations:
with open(temp_location, "rb") as segment_file:
2021-11-19 17:14:40 +01:00
copyfileobj(segment_file, dest_file)
# convert .mp4 back to .flac
2021-11-19 17:14:40 +01:00
try:
ffmpeg.input(merged_temp_location, hide_banner=None, y=None).output(
output_location, acodec="copy", loglevel="error"
).run()
2021-11-19 17:14:40 +01:00
# Remove all files
silentremove(merged_temp_location)
for temp_location in temp_locations:
silentremove(temp_location)
2023-01-16 14:25:58 +01:00
except:
self.print(
"FFmpeg is not installed or working! Using fallback, may have errors"
)
# return the MP4 temp file, but tell orpheus to change the container to .m4a (AAC)
return TrackDownloadInfo(
download_type=DownloadEnum.TEMP_FILE_PATH,
temp_file_path=merged_temp_location,
different_codec=CodecEnum.AAC,
)
2021-11-19 17:14:40 +01:00
# return the converted flac file now
2021-11-19 17:14:40 +01:00
return TrackDownloadInfo(
download_type=DownloadEnum.TEMP_FILE_PATH,
temp_file_path=output_location,
2021-11-19 17:14:40 +01:00
)
2021-10-12 15:07:04 +02:00
def get_track_cover(
self, track_id: str, cover_options: CoverOptions, data=None
) -> CoverInfo:
if data is None:
data = {}
track_data = (
data[track_id] if track_id in data else self.session.get_track(track_id)
)
cover_id = track_data["album"].get("cover")
if cover_id:
return CoverInfo(
url=self._generate_artwork_url(cover_id, size=cover_options.resolution),
file_type=ImageFileTypeEnum.jpg,
)
return CoverInfo(
url="https://tidal.com/browse/assets/images/defaultImages/defaultTrackImage.png",
file_type=ImageFileTypeEnum.png,
)
2022-03-02 01:22:58 +01:00
def get_track_lyrics(self, track_id: str, track_data: dict = None) -> LyricsInfo:
if not track_data:
track_data = {}
2022-02-06 02:50:05 +01:00
# get lyrics data for current track id
2021-08-28 15:31:36 +02:00
lyrics_data = self.session.get_lyrics(track_id)
if "error" in lyrics_data and track_data:
# search for title and artist to find a matching track (non Atmos)
results = self.search(
DownloadTypeEnum.track,
f'{track_data.get("title")} {" ".join(a.get("name") for a in track_data.get("artists"))}',
limit=10,
)
# check every result to find a matching result
best_tracks = [
r.result_id
for r in results
if r.name == track_data.get("title")
and r.artists[0] == track_data.get("artist").get("name")
and "Dolby Atmos" not in r.additional
]
# retrieve the lyrics for the first one, otherwise return empty dict
lyrics_data = (
self.session.get_lyrics(best_tracks[0]) if len(best_tracks) > 0 else {}
)
embedded = lyrics_data.get("lyrics")
synced = lyrics_data.get("subtitles")
2021-08-28 15:31:36 +02:00
return LyricsInfo(
embedded=embedded,
2022-10-23 21:36:20 +02:00
# regex to remove the space after the timestamp "[mm:ss.xx] " to "[mm:ss.xx]"
synced=(
re.sub(r"(\[\d{2}:\d{2}.\d{2,3}])(?: )", r"\1", synced)
if synced
else None
),
2021-08-28 15:31:36 +02:00
)
2022-01-20 21:27:00 +01:00
def get_track_credits(self, track_id: str, data=None) -> Optional[list]:
if data is None:
data = {}
2021-08-28 15:31:36 +02:00
credits_dict = {}
2022-01-20 21:27:00 +01:00
# fetch credits from cache if not fetch those credits
if track_id in data:
track_contributors = data[track_id]
2021-08-28 15:31:36 +02:00
for contributor in track_contributors:
credits_dict[contributor.get("type")] = [
c.get("name") for c in contributor.get("contributors")
]
2021-08-28 15:31:36 +02:00
else:
track_contributors = self.session.get_track_contributors(track_id).get(
"items"
)
2021-08-28 15:31:36 +02:00
if len(track_contributors) > 0:
for contributor in track_contributors:
2022-01-20 21:27:00 +01:00
# check if the dict contains no list, create one
if contributor.get("role") not in credits_dict:
credits_dict[contributor.get("role")] = []
2021-08-28 15:31:36 +02:00
credits_dict[contributor.get("role")].append(
contributor.get("name")
)
2021-08-28 15:31:36 +02:00
if len(credits_dict) > 0:
2022-01-20 21:27:00 +01:00
# convert the dictionary back to a list of CreditsInfo
2021-08-28 15:31:36 +02:00
return [CreditsInfo(sanitise_name(k), v) for k, v in credits_dict.items()]
return None
@staticmethod
def convert_tags(
track_data: dict, album_data: dict, mqa_file: MqaIdentifier = None
) -> Tags:
track_name = track_data.get("title")
track_name += (
f' ({track_data.get("version")})' if track_data.get("version") else ""
)
2021-09-17 18:21:26 +02:00
2022-02-02 00:27:28 +01:00
extra_tags = {}
if mqa_file is not None:
encoder_time = datetime.now().strftime("%b %d %Y %H:%M:%S")
extra_tags = {
"ENCODER": f"MQAEncode v1.1, 2.4.0+0 (278f5dd), E24F1DE5-32F1-4930-8197-24954EB9D6F4, {encoder_time}",
"MQAENCODER": f"MQAEncode v1.1, 2.4.0+0 (278f5dd), E24F1DE5-32F1-4930-8197-24954EB9D6F4, {encoder_time}",
"ORIGINALSAMPLERATE": str(mqa_file.original_sample_rate),
2022-02-02 00:27:28 +01:00
}
2021-11-19 17:14:40 +01:00
return Tags(
album_artist=(
album_data.get("artist").get("name") if "artist" in album_data else None
),
track_number=track_data.get("trackNumber"),
total_tracks=album_data.get("numberOfTracks"),
disc_number=track_data.get("volumeNumber"),
total_discs=album_data.get("numberOfVolumes"),
isrc=track_data.get("isrc"),
upc=album_data.get("upc"),
release_date=album_data.get("releaseDate"),
copyright=track_data.get("copyright"),
replay_gain=track_data.get("replayGain"),
replay_peak=track_data.get("peak"),
extra_tags=extra_tags,
2021-08-28 15:31:36 +02:00
)