From cdf00a2617b1ac124d30528891876d7f6ae5ba75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C=E8=A7=82=E5=AF=9F=E6=97=A5=E5=BF=97?= Date: Thu, 12 Jun 2025 23:40:47 +0800 Subject: [PATCH] fix: asynchronous save tasks --- src/mp4.py | 12 ++++++------ src/rip.py | 31 ++++++++++++++----------------- src/save.py | 2 +- src/utils.py | 20 ++++++++++++++++++-- 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/src/mp4.py b/src/mp4.py index 5fcc739..334e8af 100644 --- a/src/mp4.py +++ b/src/mp4.py @@ -76,7 +76,7 @@ async def extract_media(m3u8_url: str, codec: str, song_metadata: SongMetadata) sample_rate=sample_rate) -async def extract_song(raw_song: bytes, codec: str) -> SongInfo: +def extract_song(raw_song: bytes, codec: str) -> SongInfo: tmp_dir = TemporaryDirectory() mp4_name = uuid.uuid4().hex raw_mp4 = Path(tmp_dir.name) / Path(f"{mp4_name}.mp4") @@ -147,7 +147,7 @@ async def extract_song(raw_song: bytes, codec: str) -> SongInfo: params=params) -async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent: bool) -> bytes: +def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent: bool) -> bytes: tmp_dir = TemporaryDirectory() name = uuid.uuid4().hex media = Path(tmp_dir.name) / Path(name).with_suffix(".media") @@ -200,7 +200,7 @@ async def encapsulate(song_info: SongInfo, decrypted_media: bytes, atmos_convent return final_song -async def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str], +def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: list[str], cover_format: str, params: dict[str, Any]) -> bytes: tmp_dir = TemporaryDirectory() name = uuid.uuid4().hex @@ -228,7 +228,7 @@ async def write_metadata(song: bytes, metadata: SongMetadata, embed_metadata: li # There are suspected errors in M4A files encapsulated by MP4Box and GPAC, # causing some applications to be unable to correctly process Metadata (such as Android.media, Salt Music) # Using FFMPEG re-encapsulating solves this problem -async def fix_encapsulate(song: bytes) -> bytes: +def fix_encapsulate(song: bytes) -> bytes: tmp_dir = TemporaryDirectory() name = uuid.uuid4().hex song_name = Path(tmp_dir.name) / Path(f"{name}.m4a") @@ -247,7 +247,7 @@ async def fix_encapsulate(song: bytes) -> bytes: # FFMPEG will overwrite maxBitrate in DecoderConfigDescriptor # Using raw song's esds box to fix it # see also https://trac.ffmpeg.org/ticket/4894 -async def fix_esds_box(raw_song: bytes, song: bytes) -> bytes: +def fix_esds_box(raw_song: bytes, song: bytes) -> bytes: tmp_dir = TemporaryDirectory() name = uuid.uuid4().hex esds_name = Path(tmp_dir.name) / Path(f"{name}.atom") @@ -270,7 +270,7 @@ async def fix_esds_box(raw_song: bytes, song: bytes) -> bytes: return final_song -async def check_song_integrity(song: bytes) -> bool: +def check_song_integrity(song: bytes) -> bool: tmp_dir = TemporaryDirectory() name = uuid.uuid4().hex song_name = Path(tmp_dir.name) / Path(f"{name}.m4a") diff --git a/src/rip.py b/src/rip.py index e560d24..3ffa85b 100644 --- a/src/rip.py +++ b/src/rip.py @@ -20,7 +20,7 @@ from src.task import Task, Status from src.types import Codec, ParentDoneHandler from src.url import Song, Album, URLType, Playlist from src.utils import get_codec_from_codec_id, check_song_existence, check_song_exists, if_raw_atmos, \ - check_album_existence, playlist_write_song_index + check_album_existence, playlist_write_song_index, run_sync # START -> getMetadata -> getLyrics -> getM3U8 -> downloadSong -> decrypt -> encapsulate -> save -> END @@ -48,41 +48,38 @@ async def recv_decrypted_sample(adam_id: str, sample_index: int, sample: bytes): task.decryptedSamples[sample_index] = sample task.decryptedCount += 1 if task.decryptedCount == len(task.decryptedSamples): - await decrypt_done(adam_id) + it(AbstractEventLoop).create_task(decrypt_done(adam_id)) async def decrypt_done(adam_id: str): task = adam_id_task_mapping[adam_id] codec = get_codec_from_codec_id(task.m3u8Info.codec_id) - song = await encapsulate(task.info, bytes().join(task.decryptedSamples), it(Config).download.atmosConventToM4a) + song = await run_sync(encapsulate, task.info, bytes().join(task.decryptedSamples), + it(Config).download.atmosConventToM4a) if not if_raw_atmos(codec, it(Config).download.atmosConventToM4a): - song = await write_metadata(song, task.metadata, it(Config).metadata.embedMetadata, - it(Config).download.coverFormat, task.info.params) + song = await run_sync(write_metadata, song, task.metadata, it(Config).metadata.embedMetadata, + it(Config).download.coverFormat, task.info.params) if codec != Codec.EC3 or codec != Codec.EC3: - song = await fix_encapsulate(song) + song = await run_sync(fix_encapsulate, song) if codec == Codec.AAC or codec == Codec.AAC_DOWNMIX or codec == Codec.AAC_BINAURAL: - song = await fix_esds_box(task.info.raw, song) + song = await run_sync(fix_esds_box, task.info.raw, song) - song = await write_metadata(song, task.metadata, it(Config).metadata.embedMetadata, - it(Config).download.coverFormat, task.info.params) + song = await run_sync(write_metadata, song, task.metadata, it(Config).metadata.embedMetadata, + it(Config).download.coverFormat, task.info.params) - if not await check_song_integrity(song): + if not await run_sync(check_song_integrity, song): task.logger.failed_integrity() - filename = await save(song, codec, task.metadata, task.playlist) + filename = await run_sync(save, song, codec, task.metadata, task.playlist) task.logger.saved() - if task.parentDone: - await task.parentDone.try_done() + await task_done(task, Status.DONE) if it(Config).download.afterDownloaded: command = it(Config).download.afterDownloaded.format(filename=filename) subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - task.update_status(Status.DONE) - del adam_id_task_mapping[adam_id] - async def rip_song(url: Song, codec: str, flags: Flags = Flags(), parent_done: ParentDoneHandler = None, playlist: PlaylistInfo = None): @@ -148,7 +145,7 @@ async def rip_song(url: Song, codec: str, flags: Flags = Flags(), task.logger.decrypting() task.update_status(Status.DECRYPTING) codec = get_codec_from_codec_id(task.m3u8Info.codec_id) - task.info = await extract_song(raw_song, codec) + task.info = await run_sync(extract_song, raw_song, codec) task.init_decrypted_samples() for sampleIndex, sample in enumerate(task.info.samples): await it(WrapperManager).decrypt(task.adamId, task.m3u8Info.keys[sample.descIndex], sample.data, sampleIndex) diff --git a/src/save.py b/src/save.py index 4013cdc..0558244 100644 --- a/src/save.py +++ b/src/save.py @@ -9,7 +9,7 @@ from src.models import PlaylistInfo from src.utils import ttml_convent_to_lrc, get_song_name_and_dir_path, get_suffix -async def save(song: bytes, codec: str, metadata: SongMetadata, playlist: PlaylistInfo = None): +def save(song: bytes, codec: str, metadata: SongMetadata, playlist: PlaylistInfo = None): song_name, dir_path = get_song_name_and_dir_path(codec.upper(), metadata, playlist) if not dir_path.exists() or not dir_path.is_dir(): os.makedirs(dir_path.absolute()) diff --git a/src/utils.py b/src/utils.py index 0645807..754cd45 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,7 +1,9 @@ import asyncio +import concurrent.futures import subprocess import sys import time +from asyncio import AbstractEventLoop from copy import deepcopy from datetime import datetime, timedelta from itertools import islice @@ -12,6 +14,7 @@ import regex from bs4 import BeautifulSoup from creart import it from loguru import logger +from pydantic import ValidationError from src.api import WebAPI from src.config import Config @@ -21,6 +24,9 @@ from src.models import PlaylistInfo from src.types import * +executor_pool = concurrent.futures.ThreadPoolExecutor() + + def check_url(url): pattern = regex.compile( r'^(?:https:\/\/(?:beta\.music|music)\.apple\.com\/(\w{2})(?:\/album|\/album\/.+))\/(?:id)?(\d[^\D]+)(?:$|\?)') @@ -214,12 +220,22 @@ def check_dep(): async def check_song_existence(adam_id: str, region: str): check = False for m_region in (await it(WrapperManager).status()).regions: - check = await it(WebAPI).exist_on_storefront_by_song_id(adam_id, region, m_region) + try: + check = await it(WebAPI).exist_on_storefront_by_song_id(adam_id, region, m_region) + except ValidationError: + pass return check async def check_album_existence(album_id: str, region: str): check = False for m_region in (await it(WrapperManager).status()).regions: - check = await it(WebAPI).exist_on_storefront_by_album_id(album_id, region, m_region) + try: + check = await it(WebAPI).exist_on_storefront_by_album_id(album_id, region, m_region) + except ValidationError: + pass return check + + +async def run_sync(task: Callable, *args): + return await it(AbstractEventLoop).run_in_executor(executor_pool, task, *args)