feat: ripping aac-legacy song(experimental)

This commit is contained in:
世界观察日志
2025-08-04 01:01:51 +08:00
parent c8e45b05df
commit 8a27e8c1f3
10 changed files with 394 additions and 8 deletions

BIN
assets/device.wvd Normal file

Binary file not shown.

View File

@@ -38,7 +38,7 @@ class InteractiveShell:
download_parser = subparser.add_parser("download", aliases=["dl"])
download_parser.add_argument("url", type=str)
download_parser.add_argument("-c", "--codec",
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "ac3"],
choices=["alac", "ec3", "aac", "aac-binaural", "aac-downmix", "aac-legacy", "ac3"],
default="alac")
download_parser.add_argument("-f", "--force", default=False, action="store_true")
download_parser.add_argument("--include-participate-songs", default=False, dest="include", action="store_true")

0
src/legacy/__init__.py Normal file
View File

24
src/legacy/decrypt.py Normal file
View File

@@ -0,0 +1,24 @@
import base64
from pywidevine import PSSH, Device, Cdm
from src.legacy.pssh import generate_pssh
class WidevineDecrypt:
device: Device
cdm: Cdm
session_id: bytes
def __init__(self):
self.device = Device.load("assets/device.wvd")
self.cdm = Cdm.from_device(self.device)
self.session_id = self.cdm.open()
def generate_challenge(self, kid: str):
pssh = PSSH(generate_pssh(kid))
challenge = self.cdm.get_license_challenge(self.session_id, pssh)
return base64.standard_b64encode(challenge).decode()
def generate_key(self, license: str):
self.cdm.parse_license(self.session_id, license)
return self.cdm.get_keys(self.session_id)

33
src/legacy/mp4.py Normal file
View File

@@ -0,0 +1,33 @@
import subprocess
import uuid
from pathlib import Path
from tempfile import TemporaryDirectory
import m3u8
from creart import it
from src.api import WebAPI
from src.mp4 import if_shell
from src.types import M3U8Info, Codec
async def extract_media(m3u8_url: str):
parsed_m3u8 = m3u8.loads(await it(WebAPI).download_m3u8(m3u8_url), uri=m3u8_url)
return M3U8Info(uri=parsed_m3u8.segment_map[0].absolute_uri, keys=[parsed_m3u8.keys[0].absolute_uri],
codec_id=Codec.AAC_LEGACY)
def decrypt(song: bytes, kid: str, key: str) -> bytes:
tmp_dir = TemporaryDirectory()
name = uuid.uuid4().hex
song_name = Path(tmp_dir.name) / Path(f"{name}.m4a")
new_song_name = Path(tmp_dir.name) / Path(f"{name}_fixed.m4a")
with open(song_name.absolute(), "wb") as f:
f.write(song)
subprocess.run(
f"mp4decrypt --key {kid}:{key} {song_name.absolute()} {new_song_name.absolute()}",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(new_song_name.absolute(), "rb") as f:
decrypted_song = f.read()
tmp_dir.cleanup()
return decrypted_song

278
src/legacy/pssh.py Normal file
View File

@@ -0,0 +1,278 @@
#!/usr/bin/python3
# Copyright 2016 Google LLC. All rights reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""A utility to parse and generate PSSH boxes."""
# This file itself is considered an invalid module name because of the dash in
# the filename: pssh-box.py
# pylint: disable=invalid-name
import argparse
import base64
import itertools
import os
import struct
import sys
import src.legacy.proto.WidevineCencHeader_pb2 as widevine_pssh_data_pb2
def to_code_point(value):
"""
Return the unicode code point with `int` passthrough
"""
if isinstance(value, int):
return value
return ord(value)
COMMON_SYSTEM_ID = base64.b16decode('1077EFECC0B24D02ACE33C1E52E2FB4B')
WIDEVINE_SYSTEM_ID = base64.b16decode('EDEF8BA979D64ACEA3C827DCD51D21ED')
PLAYREADY_SYSTEM_ID = base64.b16decode('9A04F07998404286AB92E65BE0885F95')
class BinaryReader(object):
"""A helper class used to read binary data from an binary string."""
def __init__(self, data, little_endian):
self.data = data
self.little_endian = little_endian
self.position = 0
def has_data(self):
"""Returns whether the reader has any data left to read."""
return self.position < len(self.data)
def read_bytes(self, count):
"""Reads the given number of bytes into an array."""
if len(self.data) < self.position + count:
raise RuntimeError('Invalid PSSH box, not enough data')
ret = self.data[self.position:self.position + count]
self.position += count
return ret
def read_int(self, size):
"""Reads an integer of the given size (in bytes)."""
data = self.read_bytes(size)
ret = 0
for i in range(0, size):
if self.little_endian:
ret |= (to_code_point(data[i]) << (8 * i))
else:
ret |= (to_code_point(data[i]) << (8 * (size - i - 1)))
return ret
class Pssh(object):
"""Defines a PSSH box and related functions."""
def __init__(self, version, system_id, key_ids, pssh_data):
"""Parses a PSSH box from the given data.
Args:
version: The version number of the box
system_id: A binary string of the System ID
key_ids: An array of binary strings for the key IDs
pssh_data: A binary string of the PSSH data
"""
self.version = version
self.system_id = system_id
self.key_ids = key_ids or []
self.pssh_data = pssh_data or ''
def binary_string(self):
"""Converts the PSSH box to a binary string."""
ret = b'pssh' + _create_bin_int(self.version << 24)
ret += self.system_id
if self.version == 1:
ret += _create_bin_int(len(self.key_ids))
for key in self.key_ids:
ret += key
ret += _create_bin_int(len(self.pssh_data))
ret += self.pssh_data
return _create_bin_int(len(ret) + 4) + ret
def human_string(self):
"""Converts the PSSH box to a human readable string."""
system_name = ''
convert_data = None
if self.system_id == WIDEVINE_SYSTEM_ID:
system_name = 'Widevine'
convert_data = _parse_widevine_data
elif self.system_id == PLAYREADY_SYSTEM_ID:
system_name = 'PlayReady'
convert_data = _parse_playready_data
elif self.system_id == COMMON_SYSTEM_ID:
system_name = 'Common'
lines = [
'PSSH Box v%d' % self.version,
' System ID: %s %s' % (system_name, _create_uuid(self.system_id))
]
if self.version == 1:
lines.append(' Key IDs (%d):' % len(self.key_ids))
lines.extend([' ' + _create_uuid(key) for key in self.key_ids])
lines.append(' PSSH Data (size: %d):' % len(self.pssh_data))
if self.pssh_data:
if convert_data:
lines.append(' ' + system_name + ' Data:')
try:
extra = convert_data(self.pssh_data)
lines.extend([' ' + x for x in extra])
# pylint: disable=broad-except
except Exception as e:
lines.append(' ERROR: ' + str(e))
else:
lines.extend([
' Raw Data (base64):',
' ' + base64.b64encode(self.pssh_data)
])
return '\n'.join(lines)
def _split_list_on(elems, sep):
"""Splits the given list on the given separator."""
return [list(g) for k, g in itertools.groupby(elems, lambda x: x == sep)
if not k]
def _create_bin_int(value):
"""Creates a binary string as 4-byte array from the given integer."""
return struct.pack('>i', value)
def _create_uuid(data):
"""Creates a human readable UUID string from the given binary string."""
ret = base64.b16encode(data).decode().lower()
return (ret[:8] + '-' + ret[8:12] + '-' + ret[12:16] + '-' + ret[16:20] +
'-' + ret[20:])
def _generate_widevine_data(key_ids, content_id, provider, protection_scheme):
"""Generate widevine pssh data."""
wv = widevine_pssh_data_pb2.WidevinePsshData()
wv.key_id.extend(key_ids)
if provider:
wv.provider = provider
if content_id:
wv.content_id = content_id
# 'cenc' is the default, so omitted to save bytes.
if protection_scheme and protection_scheme != 'cenc':
wv.protection_scheme = struct.unpack('>L', protection_scheme.encode())[0]
return wv.SerializeToString()
def _parse_widevine_data(data):
"""Parses Widevine PSSH box from the given binary string."""
wv = widevine_pssh_data_pb2.WidevinePsshData()
wv.ParseFromString(data)
ret = []
if wv.key_id:
ret.append('Key IDs (%d):' % len(wv.key_id))
ret.extend([' ' + _create_uuid(x) for x in wv.key_id])
if wv.HasField('provider'):
ret.append('Provider: ' + wv.provider)
if wv.HasField('content_id'):
ret.append('Content ID: ' + base64.b16encode(wv.content_id).decode())
if wv.HasField('policy'):
ret.append('Policy: ' + wv.policy)
if wv.HasField('crypto_period_index'):
ret.append('Crypto Period Index: %d' % wv.crypto_period_index)
if wv.HasField('protection_scheme'):
protection_scheme = struct.pack('>L', wv.protection_scheme)
ret.append('Protection Scheme: %s' % protection_scheme)
return ret
def _parse_playready_data(data):
"""Parses PlayReady PSSH data from the given binary string."""
reader = BinaryReader(data, little_endian=True)
size = reader.read_int(4)
if size != len(data):
raise RuntimeError('Length incorrect')
ret = []
count = reader.read_int(2)
while count > 0:
count -= 1
record_type = reader.read_int(2)
record_len = reader.read_int(2)
record_data = reader.read_bytes(record_len)
ret.append('Record (size %d):' % record_len)
if record_type == 1:
xml = record_data.decode('utf-16 LE')
ret.extend([
' Record Type: Rights Management Header (1)',
' Record XML:',
' ' + xml
])
elif record_type == 3:
ret.extend([
' Record Type: License Store (1)',
' License Data:',
' ' + base64.b64encode(record_data)
])
else:
raise RuntimeError('Invalid record type %d' % record_type)
if reader.has_data():
raise RuntimeError('Extra data after records')
return ret
def _parse_boxes(data):
"""Parses one or more PSSH boxes for the given binary data."""
reader = BinaryReader(data, little_endian=False)
boxes = []
while reader.has_data():
start = reader.position
size = reader.read_int(4)
box_type = reader.read_bytes(4)
if box_type != b'pssh':
raise RuntimeError(
'Invalid box type 0x%s, not \'pssh\'' % box_type.encode('hex'))
version_and_flags = reader.read_int(4)
version = version_and_flags >> 24
if version > 1:
raise RuntimeError('Invalid PSSH version %d' % version)
system_id = reader.read_bytes(16)
key_ids = []
if version == 1:
count = reader.read_int(4)
while count > 0:
key = reader.read_bytes(16)
key_ids.append(key)
count -= 1
pssh_data_size = reader.read_int(4)
pssh_data = reader.read_bytes(pssh_data_size)
if start + size != reader.position:
raise RuntimeError('Box size does not match size of data')
pssh = Pssh(version, system_id, key_ids, pssh_data)
boxes.append(pssh)
return boxes
def generate_pssh(kidB64: str) -> str:
kid = base64.standard_b64decode(kidB64)
pssh_data = _generate_widevine_data([kid], "", "", "cenc")
pssh = Pssh(0, WIDEVINE_SYSTEM_ID, [kid], pssh_data)
return base64.standard_b64encode(pssh.binary_string()).decode()

View File

@@ -83,6 +83,9 @@ class RipLogger:
def lossless_audio_not_exist(self):
self.logger.error("Failed to download song. Lossless audio does not exist")
def lossless_audio_not_exist_aac(self):
self.logger.warning("Lossless audio does not exist. Using aac-legacy to rip")
def downloading(self):
self.logger.info("Downloading song...")

View File

@@ -108,7 +108,7 @@ def extract_song(raw_song: bytes, codec: str) -> SongInfo:
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=if_shell())
with open(alac_atom_name, "rb") as f:
decoder_params = f.read()
case Codec.AAC | Codec.AAC_DOWNMIX | Codec.AAC_BINAURAL:
case Codec.AAC | Codec.AAC_DOWNMIX | Codec.AAC_BINAURAL | Codec.AAC_LEGACY:
info_name = (Path(tmp_dir.name) / Path(mp4_name).with_suffix('.info')).absolute()
with open(info_name, "rb") as f:
decoder_params = f.read()

View File

@@ -20,6 +20,9 @@ from src.types import Codec, ParentDoneHandler
from src.url import Song, Album, URLType, Playlist
from src.utils import get_codec_from_codec_id, check_song_existence, check_song_exists, if_raw_atmos, \
check_album_existence, playlist_write_song_index, run_sync, safely_create_task
from src.legacy.mp4 import extract_media as legacy_extract_media
from src.legacy.mp4 import decrypt as legacy_decrypt
from src.legacy.decrypt import WidevineDecrypt
# START -> getMetadata -> getLyrics -> getM3U8 -> downloadSong -> decrypt -> encapsulate -> save -> END
@@ -88,7 +91,8 @@ async def rip_song(url: Song, codec: str, flags: Flags = Flags(),
# Set Metadata
raw_metadata = await it(WebAPI).get_song_info(task.adamId, url.storefront, it(Config).region.language)
album_data = await it(WebAPI).get_album_info(raw_metadata.relationships.albums.data[0].id, url.storefront, it(Config).region.language)
album_data = await it(WebAPI).get_album_info(raw_metadata.relationships.albums.data[0].id, url.storefront,
it(Config).region.language)
task.metadata = SongMetadata.parse_from_song_data(raw_metadata)
task.metadata.parse_from_album_data(album_data)
@@ -121,8 +125,17 @@ async def rip_song(url: Song, codec: str, flags: Flags = Flags(),
if codec == Codec.ALAC and raw_metadata.attributes.extendedAssetUrls.enhancedHls:
m3u8_url = await it(WrapperManager).m3u8(task.adamId)
else:
m3u8_url = raw_metadata.attributes.extendedAssetUrls.enhancedHls
if not m3u8_url and not raw_metadata.attributes.extendedAssetUrls.enhancedHls:
if Codec == Codec.AAC_LEGACY:
task.logger.lossless_audio_not_exist_aac()
safely_create_task(rip_song_legacy(task))
return
else:
m3u8_url = raw_metadata.attributes.extendedAssetUrls.enhancedHls
if not m3u8_url and it(Config).download.codecAlternative and Codec.AAC_LEGACY in it(Config).download.codecPriority:
task.logger.lossless_audio_not_exist_aac()
safely_create_task(rip_song_legacy(task))
return
elif not m3u8_url:
task.logger.lossless_audio_not_exist()
await task_done(task, Status.FAILED)
return
@@ -152,6 +165,39 @@ async def rip_song(url: Song, codec: str, flags: Flags = Flags(),
await it(WrapperManager).decrypt(task.adamId, task.m3u8Info.keys[sample.descIndex], sample.data, sampleIndex)
async def rip_song_legacy(task: Task):
task.m3u8Info = await legacy_extract_media(await it(WrapperManager).webPlayback(task.adamId))
task.logger.downloading()
task.update_status(Status.DOWNLOADING)
raw_song = await it(WebAPI).download_song(task.m3u8Info.uri)
task.info = await run_sync(extract_song, raw_song, Codec.AAC_LEGACY)
task.logger.decrypting()
task.update_status(Status.DECRYPTING)
wvDecrypt = WidevineDecrypt()
challenge = wvDecrypt.generate_challenge(task.m3u8Info.keys[0].split(",")[1])
wvLicense = await it(WrapperManager).license(adam_id=task.adamId, challenge=challenge,
kid=task.m3u8Info.keys[0])
keys = wvDecrypt.generate_key(wvLicense)
song = await run_sync(legacy_decrypt, raw_song, keys[1].kid.hex, keys[1].key.hex())
song = await run_sync(write_metadata, song, task.metadata, it(Config).metadata.embedMetadata,
it(Config).download.coverFormat, task.info.params)
if not await run_sync(check_song_integrity, song):
task.logger.failed_integrity()
filename = await run_sync(save, song, Codec.AAC_LEGACY, task.metadata, task.playlist)
task.logger.saved()
await task_done(task, Status.DONE)
if it(Config).download.afterDownloaded:
command = it(Config).download.afterDownloaded.format(filename=filename)
subprocess.Popen(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
async def rip_album(url: Album, codec: str, flags: Flags = Flags(), parent_done: ParentDoneHandler = None):
album_info = await it(WebAPI).get_album_info(url.id, url.storefront, it(Config).region.language)
logger = RipLogger(url.type, url.id)

View File

@@ -39,8 +39,8 @@ class M3U8Info(BaseModel):
uri: str
keys: list[str]
codec_id: str
bit_depth: Optional[int]
sample_rate: Optional[int]
bit_depth: Optional[int] = None
sample_rate: Optional[int] = None
class Codec:
@@ -50,6 +50,7 @@ class Codec:
AAC_BINAURAL = "aac-binaural"
AAC_DOWNMIX = "aac-downmix"
AAC = "aac"
AAC_LEGACY = "aac-legacy"
class CodecKeySuffix:
@@ -73,5 +74,6 @@ class CodecRegex:
def get_pattern_by_codec(cls, codec: str):
codec_pattern_mapping = {Codec.ALAC: cls.RegexCodecAlac, Codec.EC3: cls.RegexCodecAtmos,
Codec.AAC_DOWNMIX: cls.RegexCodecDownmix, Codec.AAC_BINAURAL: cls.RegexCodecBinaural,
Codec.AAC: cls.RegexCodecAAC, Codec.AC3: cls.RegexCodecAC3}
Codec.AAC: cls.RegexCodecAAC, Codec.AAC_LEGACY: cls.RegexCodecAAC,
Codec.AC3: cls.RegexCodecAC3}
return codec_pattern_mapping.get(codec)