Better caching added

This commit is contained in:
Dniel97
2022-01-20 21:27:00 +01:00
parent 517e2dc89e
commit 0053ed1da9

View File

@@ -127,11 +127,6 @@ class ModuleInterface:
self.session: TidalApi = TidalApi(sessions)
# Track cache for credits
self.track_cache = {}
# Album cache
self.album_cache = {}
def check_subscription(self, subscription: str) -> bool:
# returns true if "disable_subscription_checks" is enabled or subscription is HIFI Plus
if not self.disable_subscription_check and subscription not in {'HIFI', 'PREMIUM_PLUS'}:
@@ -203,25 +198,127 @@ class ModuleInterface:
return items
def get_track_info(self, track_id: str, quality_tier: QualityEnum, codec_options: CodecOptions) -> TrackInfo:
track_data = self.session.get_track(track_id)
def get_playlist_info(self, playlist_id: str) -> PlaylistInfo:
playlist_data = self.session.get_playlist(playlist_id)
playlist_tracks = self.session.get_playlist_items(playlist_id)
tracks = [track['item']['id'] for track in playlist_tracks['items'] if track['type'] == 'track']
if 'name' in playlist_data['creator']:
creator_name = playlist_data['creator']['name']
elif playlist_data['creator']['id'] == 0:
creator_name = 'TIDAL'
else:
creator_name = 'Unknown'
return PlaylistInfo(
name=playlist_data['title'],
creator=creator_name,
tracks=tracks,
# TODO: Use playlist creation date or lastUpdated?
release_year=playlist_data['created'][:4],
creator_id=playlist_data['creator']['id'],
cover_url=self.generate_artwork_url(playlist_data['squareImage'], size=self.cover_size, max_size=1080),
track_extra_kwargs={'data': {track['item']['id']: track['item'] for track in playlist_tracks['items']}}
)
def get_artist_info(self, artist_id: str, get_credited_albums: bool) -> ArtistInfo:
artist_data = self.session.get_artist(artist_id)
artist_albums = self.session.get_artist_albums(artist_id)['items']
artist_singles = self.session.get_artist_albums_ep_singles(artist_id)['items']
# Only works with a mobile session, annoying, never do this again
credit_albums = []
if get_credited_albums and SessionType.MOBILE.name in self.available_sessions:
self.session.default = SessionType.MOBILE
credited_albums_page = self.session.get_page('contributor', params={'artistId': artist_id})
# This is so retarded
page_list = credited_albums_page['rows'][-1]['modules'][0]['pagedList']
total_items = page_list['totalNumberOfItems']
more_items_link = page_list['dataApiPath'][6:]
# Now fetch all the found total_items
items = []
for offset in range(0, total_items // 50 + 1):
print(f'Fetching {offset * 50}/{total_items}', end='\r')
items += self.session.get_page(more_items_link, params={'limit': 50, 'offset': offset * 50})['items']
credit_albums = [item['item']['album'] for item in items]
self.session.default = SessionType.TV
albums = [str(album['id']) for album in artist_albums + artist_singles + credit_albums]
return ArtistInfo(
name=artist_data['name'],
albums=albums,
album_extra_kwargs={'data': {str(album['id']): album for album in
artist_albums + artist_singles + credit_albums}}
)
def get_album_info(self, album_id: str, data=None) -> AlbumInfo:
# check if album is already in album cache, add it
if data is None:
data = {}
album_data = data[album_id] if album_id in data else self.session.get_album(album_id)
# get all album tracks with corresponding credits
tracks_data = self.session.get_album_contributors(album_id)
# add the track contributors to a new list called 'credits'
cache = {'data': {}}
for track in tracks_data['items']:
track['item'].update({'credits': track['credits']})
cache['data'][str(track['item']['id'])] = track['item']
tracks = [str(track['item']['id']) for track in tracks_data['items']]
if album_data['audioModes'] == ['DOLBY_ATMOS']:
quality = 'Dolby Atmos'
elif album_data['audioModes'] == ['SONY_360RA']:
quality = '360'
elif album_data['audioQuality'] == 'HI_RES':
quality = 'M'
else:
quality = None
return AlbumInfo(
name=album_data['title'],
release_year=album_data['releaseDate'][:4],
explicit=album_data['explicit'],
quality=quality,
upc=album_data['upc'],
all_track_cover_jpg_url=self.generate_artwork_url(album_data['cover'],
size=self.cover_size) if album_data['cover'] else None,
animated_cover_url=self.generate_animated_artwork_url(album_data['videoCover']) if album_data[
'videoCover'] else None,
artist=album_data['artist']['name'],
artist_id=album_data['artist']['id'],
tracks=tracks,
track_extra_kwargs=cache
)
def get_track_info(self, track_id: str, quality_tier: QualityEnum, codec_options: CodecOptions,
data=None) -> TrackInfo:
if data is None:
data = {}
track_data = data[track_id] if track_id in data else self.session.get_track(track_id)
album_id = str(track_data['album']['id'])
# check if album is already in album cache, get it
album_data = data[album_id] if album_id in data else self.session.get_album(album_id)
# Check if album is already in album cache, add it
if album_id in self.album_cache:
album_data = self.album_cache[album_id]
else:
album_data = self.session.get_album(album_id)
# Get Sony 360RA and switch to mobile session
# get Sony 360RA and switch to mobile session
if track_data['audioModes'] == ['SONY_360RA'] and SessionType.MOBILE.name in self.available_sessions:
self.session.default = SessionType.MOBILE
else:
self.session.default = SessionType.TV
stream_data = self.session.get_stream_url(track_id, self.quality_parse[quality_tier])
# Only needed for MPEG-DASH
# only needed for MPEG-DASH
audio_track = None
if stream_data['manifestMimeType'] == 'application/dash+xml':
@@ -269,7 +366,9 @@ class ModuleInterface:
tags=self.convert_tags(track_data, album_data),
codec=track_codec,
download_extra_kwargs=download_args,
lyrics_extra_kwargs={'track_data': track_data}
lyrics_extra_kwargs={'track_data': track_data},
# check if 'credits' are present (only from get_album_data)
credits_extra_kwargs={'data': {track_id: track_data['credits']} if 'credits' in track_data else {}}
)
if not codec_options.spatial_codecs and codec_data[track_codec].spatial:
@@ -438,107 +537,15 @@ class ModuleInterface:
synced=synced
)
def get_playlist_info(self, playlist_id: str) -> PlaylistInfo:
playlist_data = self.session.get_playlist(playlist_id)
playlist_tracks = self.session.get_playlist_items(playlist_id)
def get_track_credits(self, track_id: str, data=None) -> Optional[list]:
if data is None:
data = {}
tracks = [track['item']['id'] for track in playlist_tracks['items'] if track['type'] == 'track']
if 'name' in playlist_data['creator']:
creator_name = playlist_data['creator']['name']
elif playlist_data['creator']['id'] == 0:
creator_name = 'TIDAL'
else:
creator_name = 'Unknown'
return PlaylistInfo(
name=playlist_data['title'],
creator=creator_name,
tracks=tracks,
# TODO: Use playlist creation date or lastUpdated?
release_year=playlist_data['created'][:4],
creator_id=playlist_data['creator']['id'],
cover_url=self.generate_artwork_url(playlist_data['squareImage'], size=self.cover_size, max_size=1080)
)
def get_album_info(self, album_id):
# Check if album is already in album cache, add it
if album_id in self.album_cache:
album_data = self.album_cache[album_id]
else:
album_data = self.session.get_album(album_id)
# Get all album tracks with corresponding credits
tracks_data = self.session.get_album_contributors(album_id)
tracks = [str(track['item']['id']) for track in tracks_data['items']]
# Cache all track (+credits) in track_cache
self.track_cache.update({str(track['item']['id']): track for track in tracks_data['items']})
if album_data['audioModes'] == ['DOLBY_ATMOS']:
quality = 'Dolby Atmos'
elif album_data['audioModes'] == ['SONY_360RA']:
quality = '360'
elif album_data['audioQuality'] == 'HI_RES':
quality = 'M'
else:
quality = None
return AlbumInfo(
name=album_data['title'],
release_year=album_data['releaseDate'][:4],
explicit=album_data['explicit'],
quality=quality,
upc=album_data['upc'],
all_track_cover_jpg_url=self.generate_artwork_url(album_data['cover'],
size=self.cover_size) if album_data['cover'] else None,
animated_cover_url=self.generate_animated_artwork_url(album_data['videoCover']) if album_data[
'videoCover'] else None,
artist=album_data['artist']['name'],
artist_id=album_data['artist']['id'],
tracks=tracks,
)
def get_artist_info(self, artist_id: str, get_credited_albums: bool) -> ArtistInfo:
artist_data = self.session.get_artist(artist_id)
artist_albums = self.session.get_artist_albums(artist_id)['items']
artist_singles = self.session.get_artist_albums_ep_singles(artist_id)['items']
# Only works with a mobile session, annoying, never do this again
credit_albums = []
if get_credited_albums and SessionType.MOBILE.name in self.available_sessions:
self.session.default = SessionType.MOBILE
credited_albums_page = self.session.get_page('contributor', params={'artistId': artist_id})
# This is so retarded
page_list = credited_albums_page['rows'][-1]['modules'][0]['pagedList']
total_items = page_list['totalNumberOfItems']
more_items_link = page_list['dataApiPath'][6:]
# Now fetch all the found total_items
items = []
for offset in range(0, total_items // 50 + 1):
print(f'Fetching {offset * 50}/{total_items}', end='\r')
items += self.session.get_page(more_items_link, params={'limit': 50, 'offset': offset * 50})['items']
credit_albums = [item['item']['album'] for item in items]
self.session.default = SessionType.TV
albums = [str(album['id']) for album in artist_albums + artist_singles + credit_albums]
return ArtistInfo(
name=artist_data['name'],
albums=albums
)
def get_track_credits(self, track_id: str) -> Optional[list]:
credits_dict = {}
# Fetch credits from cache if not fetch those credits
if track_id in self.track_cache:
track_contributors = self.track_cache[track_id]['credits']
# fetch credits from cache if not fetch those credits
if track_id in data:
track_contributors = data[track_id]
for contributor in track_contributors:
credits_dict[contributor['type']] = [c['name'] for c in contributor['contributors']]
@@ -547,14 +554,14 @@ class ModuleInterface:
if len(track_contributors) > 0:
for contributor in track_contributors:
# Check if the dict contains no list, create one
# check if the dict contains no list, create one
if contributor['role'] not in credits_dict:
credits_dict[contributor['role']] = []
credits_dict[contributor['role']].append(contributor['name'])
if len(credits_dict) > 0:
# Convert the dictionary back to a list of CreditsInfo
# convert the dictionary back to a list of CreditsInfo
return [CreditsInfo(sanitise_name(k), v) for k, v in credits_dict.items()]
return None