Files
AppleMusicDecrypt/src/utils.py

331 lines
13 KiB
Python
Raw Normal View History

2024-05-04 15:58:59 +08:00
import asyncio
2025-06-12 23:40:47 +08:00
import concurrent.futures
2025-08-26 20:34:29 +08:00
import json
2024-06-17 02:08:52 +08:00
import subprocess
2025-10-24 12:14:56 +08:00
import sys
2024-05-04 15:58:59 +08:00
import time
2025-06-12 23:40:47 +08:00
from asyncio import AbstractEventLoop
2025-06-07 01:06:04 +08:00
from copy import deepcopy
2024-05-29 15:10:25 +08:00
from datetime import datetime, timedelta
2024-05-04 15:58:59 +08:00
from itertools import islice
from pathlib import Path
2025-08-27 07:02:02 +08:00
from distutils.version import LooseVersion
2024-05-04 15:58:59 +08:00
import m3u8
import regex
from bs4 import BeautifulSoup
2025-06-07 01:06:04 +08:00
from creart import it
2025-06-12 23:40:47 +08:00
from pydantic import ValidationError
2024-05-04 15:58:59 +08:00
2025-08-27 07:02:02 +08:00
from src.config import Config, CONFIG_VERSION
2024-05-04 15:58:59 +08:00
from src.exceptions import NotTimeSyncedLyricsException
from src.logger import GlobalLogger
2024-05-06 23:49:41 +08:00
from src.models import PlaylistInfo
from src.models.album_meta import Tracks
2025-10-24 12:14:56 +08:00
from src.qemu import QemuInstance
2024-05-04 15:58:59 +08:00
from src.types import *
2025-06-12 23:40:47 +08:00
executor_pool = concurrent.futures.ThreadPoolExecutor()
background_tasks = set()
2025-06-12 23:40:47 +08:00
2024-05-04 15:58:59 +08:00
def check_url(url):
pattern = regex.compile(
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/album|\/album\/.+))\/(?:id)?(\d[^\D]+)(?:$|\?)')
result = regex.findall(pattern, url)
return result[0][0], result[0][1]
def check_playlist_url(url):
pattern = regex.compile(
r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/playlist|\/playlist\/.+))\/(?:id)?(pl\.[\w-]+)(?:$|\?)')
result = regex.findall(pattern, url)
return result[0][0], result[0][1]
def byte_length(i):
return (i.bit_length() + 7) // 8
2025-06-07 01:06:04 +08:00
def find_best_codec(parsed_m3u8: m3u8.M3U8, codec: str) -> Optional[m3u8.Playlist]:
available_medias = [playlist for playlist in parsed_m3u8.playlists
if regex.match(CodecRegex.get_pattern_by_codec(codec), playlist.stream_info.audio)]
2024-05-04 15:58:59 +08:00
available_medias.sort(key=lambda x: x.stream_info.average_bandwidth, reverse=True)
2025-08-28 18:41:25 +08:00
if codec == Codec.ALAC:
limited_medias = [media for media in available_medias
if int(media.media[0].extras["bit_depth"]) <= it(Config).download.maxBitDepth
and int(media.media[0].extras["sample_rate"]) <= it(Config).download.maxSampleRate]
else:
limited_medias = available_medias
if not limited_medias:
return None
return limited_medias[0]
2024-05-04 15:58:59 +08:00
def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def timeit(func):
async def process(func, *args, **params):
if asyncio.iscoroutinefunction(func):
return await func(*args, **params)
else:
return func(*args, **params)
async def helper(*args, **params):
start = time.time()
result = await process(func, *args, **params)
2025-06-21 21:21:20 +08:00
it(GlobalLogger).logger.debug(f'{func.__name__}: {time.time() - start}')
2024-05-04 15:58:59 +08:00
return result
return helper
def get_digit_from_string(text: str) -> int:
return int(''.join(filter(str.isdigit, text)))
2025-11-28 12:51:14 +08:00
def ttml_convent(ttml: str) -> str:
if it(Config).download.lyricsFormat == "ttml":
return ttml
2024-05-04 15:58:59 +08:00
b = BeautifulSoup(ttml, features="xml")
lrc_lines = []
2025-10-19 11:34:51 +08:00
2024-05-04 15:58:59 +08:00
for item in b.tt.body.children:
for lyric in item.children:
h, m, s, ms = 0, 0, 0, 0
lyric_time: str = lyric.get("begin")
if not lyric_time:
return ""
2025-11-29 10:26:53 +08:00
# raise NotTimeSyncedLyricsException
if lyric_time.find('.') == -1:
lyric_time += '.000'
2024-05-04 15:58:59 +08:00
match lyric_time.count(":"):
case 0:
split_time = lyric_time.split(".")
s, ms = get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1])
case 1:
split_time = lyric_time.split(":")
s_ms = split_time[-1]
del split_time[-1]
split_time.extend(s_ms.split("."))
m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
get_digit_from_string(split_time[2]))
case 2:
split_time = lyric_time.split(":")
s_ms = split_time[-1]
del split_time[-1]
split_time.extend(s_ms.split("."))
h, m, s, ms = (get_digit_from_string(split_time[0]), get_digit_from_string(split_time[1]),
get_digit_from_string(split_time[2]), get_digit_from_string(split_time[3]))
lrc_lines.append(
f"[{str(m + h * 60).rjust(2, '0')}:{str(s).rjust(2, '0')}.{str(int(ms / 10)).rjust(2, '0')}]{lyric.text}")
2025-11-29 10:26:53 +08:00
if "translation" in it(Config).download.lyricsExtra and b.tt.head.metadata.iTunesMetadata.translation:
2025-10-19 11:34:51 +08:00
for translation in b.tt.head.metadata.iTunesMetadata.translation.children:
if lyric.get("itunes:key") == translation.get("for"):
lrc_lines.append(
f"[{str(m + h * 60).rjust(2, '0')}:{str(s).rjust(2, '0')}.{str(int(ms / 10)).rjust(2, '0')}]{translation.text}")
2025-11-29 10:26:53 +08:00
if "pronunciation" in it(Config).download.lyricsExtra and b.tt.head.metadata.iTunesMetadata.transliteration:
for transliteration in b.tt.head.metadata.iTunesMetadata.transliteration.children:
if lyric.get("itunes:key") == transliteration.get("for"):
lrc_lines.append(
f"[{str(m + h * 60).rjust(2, '0')}:{str(s).rjust(2, '0')}.{str(int(ms / 10)).rjust(2, '0')}]{transliteration.text}")
2024-05-04 15:58:59 +08:00
return "\n".join(lrc_lines)
2025-06-07 01:06:04 +08:00
def check_song_exists(metadata, codec: str, playlist: PlaylistInfo = None):
song_name, dir_path = get_song_name_and_dir_path(codec, metadata, playlist)
return (Path(dir_path) / Path(song_name + get_suffix(codec, it(Config).download.atmosConventToM4a))).exists()
2024-05-04 15:58:59 +08:00
def get_valid_filename(filename: str):
2024-05-15 13:12:08 +08:00
return "".join(i for i in filename if i not in ["<", ">", ":", "\"", "/", "\\", "|", "?", "*"])
2024-05-04 23:59:52 +08:00
def get_valid_dir_name(dirname: str):
return regex.sub(r"\.+$", "", get_valid_filename(dirname))
2024-05-04 23:59:52 +08:00
def get_codec_from_codec_id(codec_id: str) -> str:
codecs = [Codec.AC3, Codec.EC3, Codec.AAC, Codec.ALAC, Codec.AAC_BINAURAL, Codec.AAC_DOWNMIX]
for codec in codecs:
if regex.match(CodecRegex.get_pattern_by_codec(codec), codec_id):
return codec
return ""
2024-05-05 05:02:17 +08:00
def get_song_id_from_m3u8(m3u8_url: str) -> str:
parsed_m3u8 = m3u8.load(m3u8_url)
return regex.search(r"_A(\d*)_", parsed_m3u8.playlists[0].uri)[1]
2024-05-06 20:41:33 +08:00
2024-06-06 00:31:33 +08:00
def if_raw_atmos(codec: str, convent_atmos: bool):
if (codec == Codec.EC3 or codec == Codec.AC3) and not convent_atmos:
2024-05-06 20:41:33 +08:00
return True
return False
2024-06-06 00:31:33 +08:00
def get_suffix(codec: str, convent_atmos: bool):
if not convent_atmos and codec == Codec.EC3:
2024-05-06 20:41:33 +08:00
return ".ec3"
2024-06-06 00:31:33 +08:00
elif not convent_atmos and codec == Codec.AC3:
2024-05-06 20:41:33 +08:00
return ".ac3"
else:
return ".m4a"
2024-05-06 23:49:41 +08:00
def playlist_metadata_to_params(playlist: PlaylistInfo):
2024-05-06 20:41:33 +08:00
return {"playlistName": playlist.data[0].attributes.name,
"playlistCuratorName": playlist.data[0].attributes.curatorName}
2024-05-22 13:47:27 +08:00
2025-06-07 01:06:04 +08:00
def get_audio_info_str(metadata, codec: str):
2024-05-29 16:40:29 +08:00
if all([bool(metadata.bit_depth), bool(metadata.sample_rate), bool(metadata.sample_rate_kHz)]):
2025-06-07 01:06:04 +08:00
return it(Config).download.audioInfoFormat.format(bit_depth=metadata.bit_depth,
sample_rate=metadata.sample_rate,
sample_rate_kHz=metadata.sample_rate_kHz, codec=codec)
2024-05-29 16:40:29 +08:00
else:
return ""
def get_path_safe_dict(param: dict):
new_param = deepcopy(param)
for key, val in new_param.items():
2024-05-22 13:47:27 +08:00
if isinstance(val, str):
new_param[key] = get_valid_filename(str(val))
return new_param
2024-05-06 20:41:33 +08:00
2024-05-22 13:47:27 +08:00
2025-06-07 01:06:04 +08:00
def get_song_name_and_dir_path(codec: str, metadata, playlist: PlaylistInfo = None):
2024-05-06 20:41:33 +08:00
if playlist:
safe_meta = get_path_safe_dict(metadata.model_dump())
safe_pl_meta = get_path_safe_dict(playlist_metadata_to_params(playlist))
2025-06-07 01:06:04 +08:00
song_name = it(Config).download.playlistSongNameFormat.format(codec=codec,
playlistSongIndex=metadata.playlist_index,
2025-06-07 01:06:04 +08:00
audio_info=get_audio_info_str(metadata, codec),
total_tracks=metadata.track_total[metadata.disk],
total_disks=metadata.disk_total,
2025-06-07 01:06:04 +08:00
**safe_meta, **safe_pl_meta)
dir_path = Path(it(Config).download.playlistDirPathFormat.format(codec=codec, **safe_meta, **safe_pl_meta))
2024-05-06 20:41:33 +08:00
else:
safe_meta = get_path_safe_dict(metadata.model_dump())
2025-06-07 01:06:04 +08:00
song_name = it(Config).download.songNameFormat.format(codec=codec,
total_tracks=metadata.track_total[metadata.disk],
total_disks=metadata.disk_total,
2025-06-07 01:06:04 +08:00
audio_info=get_audio_info_str(metadata, codec),
**safe_meta)
dir_path = Path(it(Config).download.dirPathFormat.format(codec=codec, **safe_meta))
2025-08-24 07:28:09 +08:00
song_name = get_valid_filename(song_name)
is_abs = dir_path.is_absolute()
sanitized_parts = [
part if i == 0 and is_abs else get_valid_dir_name(part)
for i, part in enumerate(dir_path.parts)
]
dir_path = Path(*sanitized_parts)
2024-05-06 20:41:33 +08:00
return song_name, dir_path
2024-05-06 23:52:04 +08:00
def playlist_write_song_index(playlist: PlaylistInfo):
for track_index, track in enumerate(playlist.data[0].relationships.tracks.data):
playlist.songIdIndexMapping[track.id] = track_index + 1
return playlist
2024-05-29 15:10:25 +08:00
def convent_mac_timestamp_to_datetime(timestamp: int):
d = datetime.strptime("01-01-1904", "%m-%d-%Y")
return d + timedelta(seconds=timestamp)
2024-06-17 02:08:52 +08:00
def check_dep():
2025-10-19 12:14:23 +08:00
deps = ["ffmpeg", "gpac", "MP4Box", "mp4edit", "mp4extract", "mp4decrypt"]
if it(Config).localInstance.enable:
deps.append("qemu-system-x86_64 --version")
for dep in deps:
2024-06-17 02:08:52 +08:00
try:
subprocess.run(dep, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False, dep
return True, None
2025-06-07 01:06:04 +08:00
async def check_song_existence(adam_id: str, region: str):
from src.grpc.manager import WrapperManager
from src.api import WebAPI
2025-06-07 01:06:04 +08:00
check = False
for m_region in (await it(WrapperManager).status()).regions:
2025-06-12 23:40:47 +08:00
try:
check = await it(WebAPI).exist_on_storefront_by_song_id(adam_id, region, m_region)
2025-06-16 16:35:40 +08:00
if check:
break
2025-06-12 23:40:47 +08:00
except ValidationError:
pass
2025-06-07 01:06:04 +08:00
return check
async def check_album_existence(album_id: str, region: str):
from src.grpc.manager import WrapperManager
from src.api import WebAPI
2025-06-07 01:06:04 +08:00
check = False
for m_region in (await it(WrapperManager).status()).regions:
2025-06-12 23:40:47 +08:00
try:
check = await it(WebAPI).exist_on_storefront_by_album_id(album_id, region, m_region)
2025-06-25 11:51:21 +08:00
if check:
break
2025-06-12 23:40:47 +08:00
except ValidationError:
pass
2025-06-07 01:06:04 +08:00
return check
2025-06-12 23:40:47 +08:00
async def run_sync(task: Callable, *args):
return await it(AbstractEventLoop).run_in_executor(executor_pool, task, *args)
def safely_create_task(coro):
task = it(AbstractEventLoop).create_task(coro)
background_tasks.add(task)
def done_callback(*args):
background_tasks.remove(task)
if task.exception():
2025-06-21 21:21:20 +08:00
try:
raise task.exception()
except Exception as e:
it(GlobalLogger).logger.exception(e)
task.add_done_callback(done_callback)
def count_total_track_and_disc(tracks: Tracks):
disc_count = tracks.data[-1].attributes.discNumber
track_count = {}
for track in tracks.data:
if track_count.get(track.attributes.discNumber, 0) < track.attributes.trackNumber:
track_count[track.attributes.discNumber] = track.attributes.trackNumber
return disc_count, track_count
2025-06-25 11:50:36 +08:00
def get_tasks_num():
2025-08-24 07:28:09 +08:00
return len(background_tasks)
2025-08-26 20:34:29 +08:00
def query_language(region: str):
with open("assets/storefronts.json", "r") as f:
storefronts = json.load(f)
for storefront in storefronts["data"]:
if storefront["id"].upper() == region.upper():
return storefront["attributes"]["defaultLanguageTag"], storefront["attributes"]["supportedLanguageTags"]
return None
def language_exist(region: str, language: str):
_, languages = query_language(region)
return language in languages
2025-08-27 07:02:02 +08:00
def config_outdated():
return LooseVersion(it(Config).version) < LooseVersion(CONFIG_VERSION)