First MQA test
This commit is contained in:
14
README.md
14
README.md
@@ -127,7 +127,8 @@ the largest.
|
||||
"tv_secret": "vRAdA108tlvkJpTsGZS8rGZ7xTlbJ0qaZ2K9saEzsgY=",
|
||||
"mobile_token": "dN2N95wCyEBTllu4",
|
||||
"enable_mobile": true,
|
||||
"prefer_ac4": false
|
||||
"prefer_ac4": false,
|
||||
"fix_mqa": true
|
||||
}
|
||||
```
|
||||
`tv_token`: Enter a valid TV client token
|
||||
@@ -142,6 +143,17 @@ to archive Sony 360RA and Dolby AC-4 if available
|
||||
`prefer_ac4`: If enabled and a mobile session is available (`enable_mobile` is set to `true`) this will ensure to get
|
||||
Dolby AC-4 on Dolby Atmos tracks
|
||||
|
||||
`fix_mqa`: If enabled it will download the MQA file before the actual track and analyze the FLAC file to extract the
|
||||
bitDepth and originalSampleRate. The tags `MQAENCODER`, `ENCODER` and `ORIGINALSAMPLERATE` are than added to the FLAC
|
||||
file in order to get properly detected.
|
||||
|
||||
**Credits: [MQA_identifier](https://github.com/purpl3F0x/MQA_identifier) by
|
||||
[@purpl3F0x](https://github.com/purpl3F0x) and [mqaid](https://github.com/redsudo/mqaid) by
|
||||
[@redsudo](https://github.com/redsudo).**
|
||||
|
||||
**NOTE: `fix_mqa` is experimental! May not detect already existing tracks,
|
||||
slower as normal download and could be not working at all**
|
||||
|
||||
<!-- Contact -->
|
||||
## Contact
|
||||
|
||||
|
||||
76
interface.py
76
interface.py
@@ -2,6 +2,7 @@ import base64
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
from getpass import getpass
|
||||
from dataclasses import dataclass
|
||||
from shutil import copyfileobj
|
||||
@@ -12,6 +13,7 @@ from tqdm import tqdm
|
||||
|
||||
from utils.models import *
|
||||
from utils.utils import sanitise_name, silentremove, download_to_temp, create_temp_filename
|
||||
from .mqa_identifier_python.mqa_identifier import MqaIdentifier
|
||||
from .tidal_api import TidalTvSession, TidalApi, SessionStorage, TidalMobileSession, SessionType
|
||||
|
||||
module_information = ModuleInformation(
|
||||
@@ -23,7 +25,8 @@ module_information = ModuleInformation(
|
||||
'tv_secret': 'vRAdA108tlvkJpTsGZS8rGZ7xTlbJ0qaZ2K9saEzsgY=',
|
||||
'mobile_token': 'dN2N95wCyEBTllu4',
|
||||
'enable_mobile': True,
|
||||
'prefer_ac4': False
|
||||
'prefer_ac4': False,
|
||||
'fix_mqa': True
|
||||
},
|
||||
session_storage_variables=[SessionType.TV.name, SessionType.MOBILE.name],
|
||||
netlocation_constant='tidal',
|
||||
@@ -46,9 +49,7 @@ class ModuleInterface:
|
||||
self.oprinter = module_controller.printer_controller
|
||||
self.print = module_controller.printer_controller.oprint
|
||||
self.disable_subscription_check = module_controller.orpheus_options.disable_subscription_check
|
||||
self.prefer_ac4 = module_controller.module_settings['prefer_ac4']
|
||||
|
||||
settings = module_controller.module_settings
|
||||
self.settings = module_controller.module_settings
|
||||
|
||||
# LOW = 96kbit/s AAC, HIGH = 320kbit/s AAC, LOSSLESS = 44.1/16 FLAC, HI_RES <= 48/24 FLAC with MQA
|
||||
self.quality_parse = {
|
||||
@@ -62,7 +63,7 @@ class ModuleInterface:
|
||||
sessions = {}
|
||||
self.available_sessions = [SessionType.TV.name, SessionType.MOBILE.name]
|
||||
|
||||
if settings['enable_mobile']:
|
||||
if self.settings['enable_mobile']:
|
||||
storage: SessionStorage = module_controller.temporary_settings_controller.read(SessionType.MOBILE.name)
|
||||
if not storage:
|
||||
confirm = input(' "enable_mobile" is enabled but no MOBILE session was found. Do you want to create a '
|
||||
@@ -76,9 +77,9 @@ class ModuleInterface:
|
||||
storage: SessionStorage = module_controller.temporary_settings_controller.read(session_type)
|
||||
|
||||
if session_type == SessionType.TV.name:
|
||||
sessions[session_type] = TidalTvSession(settings['tv_token'], settings['tv_secret'])
|
||||
sessions[session_type] = TidalTvSession(self.settings['tv_token'], self.settings['tv_secret'])
|
||||
else:
|
||||
sessions[session_type] = TidalMobileSession(settings['mobile_token'])
|
||||
sessions[session_type] = TidalMobileSession(self.settings['mobile_token'])
|
||||
|
||||
if storage:
|
||||
logging.debug(f'Tidal: {session_type} session found, loading')
|
||||
@@ -315,7 +316,7 @@ class ModuleInterface:
|
||||
|
||||
# get Sony 360RA and switch to mobile session
|
||||
if (track_data['audioModes'] == ['SONY_360RA']
|
||||
or (track_data['audioModes'] == ['DOLBY_ATMOS'] and self.prefer_ac4)) \
|
||||
or (track_data['audioModes'] == ['DOLBY_ATMOS'] and self.settings['prefer_ac4'])) \
|
||||
and SessionType.MOBILE.name in self.available_sessions:
|
||||
self.session.default = SessionType.MOBILE
|
||||
else:
|
||||
@@ -350,10 +351,35 @@ class ModuleInterface:
|
||||
track_name = track_data["title"]
|
||||
track_name += f' ({track_data["version"]})' if track_data['version'] else ''
|
||||
|
||||
mqa_file = None
|
||||
if audio_track:
|
||||
download_args = {'audio_track': audio_track}
|
||||
else:
|
||||
download_args = {'file_url': manifest['urls'][0]}
|
||||
# check if MQA
|
||||
if track_codec is CodecEnum.MQA and self.settings['fix_mqa']:
|
||||
self.print(f'"fix_mqa" is enabled which is experimental! May not detect already existing tracks, '
|
||||
f'slower as normal download and could be not working at all', drop_level=1)
|
||||
self.print(f'=== Downloading MQA {track_name} ({track_id}) ===', drop_level=1)
|
||||
indent_level = self.oprinter.indent_number - self.oprinter.multiplier
|
||||
# download the file to analyze it
|
||||
temp_file_path = download_to_temp(manifest['urls'][0], enable_progress_bar=True,
|
||||
indent_level=indent_level)
|
||||
download_args = {'temp_file_path': temp_file_path}
|
||||
|
||||
# detect MQA file
|
||||
mqa_file = MqaIdentifier(temp_file_path)
|
||||
|
||||
self.print(f'=== MQA {track_id} downloaded ===', drop_level=1)
|
||||
else:
|
||||
download_args = {'file_url': manifest['urls'][0]}
|
||||
|
||||
bit_depth = 24 if track_codec in [CodecEnum.EAC3, CodecEnum.MHA1] else 16
|
||||
sample_rate = 48 if track_codec in [CodecEnum.EAC3, CodecEnum.MHA1, CodecEnum.AC4] else 44.1
|
||||
|
||||
# now set everything for MQA
|
||||
if mqa_file is not None and mqa_file.is_mqa:
|
||||
bit_depth = mqa_file.bit_depth
|
||||
sample_rate = mqa_file.get_original_sample_rate()
|
||||
|
||||
track_info = TrackInfo(
|
||||
name=track_name,
|
||||
@@ -362,12 +388,11 @@ class ModuleInterface:
|
||||
artists=[a['name'] for a in track_data['artists']],
|
||||
artist_id=track_data['artist']['id'],
|
||||
release_year=track_data['streamStartDate'][:4],
|
||||
# TODO: Get correct bit_depth and sample_rate for MQA, even possible?
|
||||
bit_depth=24 if track_codec in [CodecEnum.MQA, CodecEnum.EAC3, CodecEnum.MHA1] else 16,
|
||||
sample_rate=48 if track_codec in [CodecEnum.EAC3, CodecEnum.MHA1, CodecEnum.AC4] else 44.1,
|
||||
bit_depth=bit_depth,
|
||||
sample_rate=sample_rate,
|
||||
cover_url=self.generate_artwork_url(track_data['album']['cover'], size=self.cover_size),
|
||||
explicit=track_data['explicit'] if 'explicit' in track_data else None,
|
||||
tags=self.convert_tags(track_data, album_data),
|
||||
tags=self.convert_tags(track_data, album_data, mqa_file),
|
||||
codec=track_codec,
|
||||
download_extra_kwargs=download_args,
|
||||
lyrics_extra_kwargs={'track_data': track_data},
|
||||
@@ -441,10 +466,17 @@ class ModuleInterface:
|
||||
|
||||
return tracks
|
||||
|
||||
def get_track_download(self, file_url: str = None, audio_track: AudioTrack = None) -> TrackDownloadInfo:
|
||||
# no MPEG-DASH, just a simple file
|
||||
def get_track_download(self, file_url: str = None, temp_file_path: str = None, audio_track: AudioTrack = None) \
|
||||
-> TrackDownloadInfo:
|
||||
# only file_url, temp_file_path or audio_track at a time, with file_url > temp_file_path
|
||||
|
||||
# MHA1 or EC-3
|
||||
if file_url:
|
||||
return TrackDownloadInfo(download_type=DownloadEnum.URL, file_url=file_url)
|
||||
|
||||
# MQA file with enabled "fix_mqa"
|
||||
if temp_file_path:
|
||||
return TrackDownloadInfo(download_type=DownloadEnum.TEMP_FILE_PATH, temp_file_path=temp_file_path)
|
||||
|
||||
# MPEG-DASH
|
||||
# use the total_file size for a better progress bar? Is it even possible to calculate the total size from MPD?
|
||||
@@ -570,10 +602,19 @@ class ModuleInterface:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def convert_tags(track_data: dict, album_data: dict) -> Tags:
|
||||
def convert_tags(track_data: dict, album_data: dict, mqa_file: MqaIdentifier = None) -> Tags:
|
||||
track_name = track_data["title"]
|
||||
track_name += f' ({track_data["version"]})' if track_data['version'] else ''
|
||||
|
||||
extra_tags = {}
|
||||
if mqa_file is not None:
|
||||
encoder_time = datetime.now().strftime("%b %d %Y %H:%M:%S")
|
||||
extra_tags = {
|
||||
'ENCODER': f'MQAEncode v1.1, 2.4.0+0 (278f5dd), E24F1DE5-32F1-4930-8197-24954EB9D6F4, {encoder_time}',
|
||||
'MQAENCODER': f'MQAEncode v1.1, 2.4.0+0 (278f5dd), E24F1DE5-32F1-4930-8197-24954EB9D6F4, {encoder_time}',
|
||||
'ORIGINALSAMPLERATE': str(mqa_file.original_sample_rate)
|
||||
}
|
||||
|
||||
return Tags(
|
||||
album_artist=album_data['artist']['name'],
|
||||
track_number=track_data['trackNumber'],
|
||||
@@ -585,5 +626,6 @@ class ModuleInterface:
|
||||
release_date=album_data['releaseDate'] if 'releaseDate' in album_data else None,
|
||||
copyright=track_data['copyright'],
|
||||
replay_gain=track_data['replayGain'],
|
||||
replay_peak=track_data['peak']
|
||||
replay_peak=track_data['peak'],
|
||||
extra_tags=extra_tags
|
||||
)
|
||||
|
||||
0
mqa_identifier_python/__init__.py
Normal file
0
mqa_identifier_python/__init__.py
Normal file
307
mqa_identifier_python/flac.py
Normal file
307
mqa_identifier_python/flac.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#
|
||||
# Simple FLAC decoder (Python)
|
||||
#
|
||||
# Copyright (c) 2017 Project Nayuki. (MIT License)
|
||||
# https://www.nayuki.io/page/simple-flac-implementation
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
# this software and associated documentation files (the "Software"), to deal in
|
||||
# the Software without restriction, including without limitation the rights to
|
||||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
# the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
# subject to the following conditions:
|
||||
# - The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
# - The Software is provided "as is", without warranty of any kind, express or
|
||||
# implied, including but not limited to the warranties of merchantability,
|
||||
# fitness for a particular purpose and noninfringement. In no event shall the
|
||||
# authors or copyright holders be liable for any claim, damages or other
|
||||
# liability, whether in an action of contract, tort or otherwise, arising from,
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
#
|
||||
|
||||
import struct, sys
|
||||
python3 = sys.version_info.major >= 3
|
||||
|
||||
|
||||
def main(argv):
|
||||
if len(argv) != 3:
|
||||
sys.exit("Usage: python " + argv[0] + " InFile.flac OutFile.wav")
|
||||
with BitInputStream(open(argv[1], "rb")) as inp:
|
||||
with open(argv[2], "wb") as out:
|
||||
decode_file(inp, out)
|
||||
|
||||
|
||||
def decode_file(inp, out, numsamples=None, seconds=None):
|
||||
# Handle FLAC header and metadata blocks
|
||||
if inp.read_uint(32) != 0x664C6143:
|
||||
raise ValueError("Invalid magic string")
|
||||
samplerate = None
|
||||
last = False
|
||||
while not last:
|
||||
last = inp.read_uint(1) != 0
|
||||
type = inp.read_uint(7)
|
||||
length = inp.read_uint(24)
|
||||
if type == 0: # Stream info block
|
||||
inp.read_uint(16)
|
||||
inp.read_uint(16)
|
||||
inp.read_uint(24)
|
||||
inp.read_uint(24)
|
||||
samplerate = inp.read_uint(20)
|
||||
if seconds:
|
||||
numsamples = seconds * samplerate
|
||||
numchannels = inp.read_uint(3) + 1
|
||||
sampledepth = inp.read_uint(5) + 1
|
||||
x = inp.read_uint(36)
|
||||
numsamples = numsamples or x
|
||||
inp.read_uint(128)
|
||||
else:
|
||||
for i in range(length):
|
||||
inp.read_uint(8)
|
||||
if samplerate is None:
|
||||
raise ValueError("Stream info metadata block absent")
|
||||
if sampledepth % 8 != 0:
|
||||
raise RuntimeError("Sample depth not supported")
|
||||
|
||||
# Start writing WAV file headers
|
||||
sampledatalen = numsamples * numchannels * (sampledepth // 8)
|
||||
out.write(b"RIFF")
|
||||
out.write(struct.pack("<I", sampledatalen + 36))
|
||||
out.write(b"WAVE")
|
||||
out.write(b"fmt ")
|
||||
out.write(struct.pack("<IHHIIHH", 16, 0x0001, numchannels, samplerate,
|
||||
samplerate * numchannels * (sampledepth // 8), numchannels * (sampledepth // 8), sampledepth))
|
||||
out.write(b"data")
|
||||
out.write(struct.pack("<I", sampledatalen))
|
||||
|
||||
# Decode FLAC audio frames and write raw samples
|
||||
while numsamples > 0:
|
||||
numsamples -= decode_frame(inp, numchannels, sampledepth, out)
|
||||
|
||||
|
||||
def decode_frame(inp, numchannels, sampledepth, out):
|
||||
# Read a ton of header fields, and ignore most of them
|
||||
temp = inp.read_byte()
|
||||
if temp == -1:
|
||||
return False
|
||||
sync = temp << 6 | inp.read_uint(6)
|
||||
if sync != 0x3FFE:
|
||||
raise ValueError("Sync code expected")
|
||||
|
||||
inp.read_uint(1)
|
||||
inp.read_uint(1)
|
||||
blocksizecode = inp.read_uint(4)
|
||||
sampleratecode = inp.read_uint(4)
|
||||
chanasgn = inp.read_uint(4)
|
||||
inp.read_uint(3)
|
||||
inp.read_uint(1)
|
||||
|
||||
temp = inp.read_uint(8)
|
||||
while temp >= 0b11000000:
|
||||
inp.read_uint(8)
|
||||
temp = (temp << 1) & 0xFF
|
||||
|
||||
if blocksizecode == 1:
|
||||
blocksize = 192
|
||||
elif 2 <= blocksizecode <= 5:
|
||||
blocksize = 576 << blocksizecode - 2
|
||||
elif blocksizecode == 6:
|
||||
blocksize = inp.read_uint(8) + 1
|
||||
elif blocksizecode == 7:
|
||||
blocksize = inp.read_uint(16) + 1
|
||||
elif 8 <= blocksizecode <= 15:
|
||||
blocksize = 256 << (blocksizecode - 8)
|
||||
|
||||
if sampleratecode == 12:
|
||||
inp.read_uint(8)
|
||||
elif sampleratecode in (13, 14):
|
||||
inp.read_uint(16)
|
||||
|
||||
inp.read_uint(8)
|
||||
|
||||
# Decode each channel's subframe, then skip footer
|
||||
samples = decode_subframes(inp, blocksize, sampledepth, chanasgn)
|
||||
inp.align_to_byte()
|
||||
inp.read_uint(16)
|
||||
|
||||
# Write the decoded samples
|
||||
numbytes = sampledepth // 8
|
||||
if python3:
|
||||
def write_little_int(val):
|
||||
out.write(bytes(((val >> (i * 8)) & 0xFF) for i in range(numbytes)))
|
||||
else:
|
||||
def write_little_int(val):
|
||||
out.write("".join(chr((val >> (i * 8)) & 0xFF) for i in range(numbytes)))
|
||||
addend = 128 if sampledepth == 8 else 0
|
||||
for i in range(blocksize):
|
||||
for j in range(numchannels):
|
||||
write_little_int(samples[j][i] + addend)
|
||||
return blocksize
|
||||
|
||||
|
||||
def decode_subframes(inp, blocksize, sampledepth, chanasgn):
|
||||
if 0 <= chanasgn <= 7:
|
||||
return [decode_subframe(inp, blocksize, sampledepth) for _ in range(chanasgn + 1)]
|
||||
elif 8 <= chanasgn <= 10:
|
||||
temp0 = decode_subframe(inp, blocksize, sampledepth + (1 if (chanasgn == 9) else 0))
|
||||
temp1 = decode_subframe(inp, blocksize, sampledepth + (0 if (chanasgn == 9) else 1))
|
||||
if chanasgn == 8:
|
||||
for i in range(blocksize):
|
||||
temp1[i] = temp0[i] - temp1[i]
|
||||
elif chanasgn == 9:
|
||||
for i in range(blocksize):
|
||||
temp0[i] += temp1[i]
|
||||
elif chanasgn == 10:
|
||||
for i in range(blocksize):
|
||||
side = temp1[i]
|
||||
right = temp0[i] - (side >> 1)
|
||||
temp1[i] = right
|
||||
temp0[i] = right + side
|
||||
return [temp0, temp1]
|
||||
else:
|
||||
raise ValueError("Reserved channel assignment")
|
||||
|
||||
|
||||
def decode_subframe(inp, blocksize, sampledepth):
|
||||
inp.read_uint(1)
|
||||
type = inp.read_uint(6)
|
||||
shift = inp.read_uint(1)
|
||||
if shift == 1:
|
||||
while inp.read_uint(1) == 0:
|
||||
shift += 1
|
||||
sampledepth -= shift
|
||||
|
||||
if type == 0: # Constant coding
|
||||
result = [inp.read_signed_int(sampledepth)] * blocksize
|
||||
elif type == 1: # Verbatim coding
|
||||
result = [inp.read_signed_int(sampledepth) for _ in range(blocksize)]
|
||||
elif 8 <= type <= 12:
|
||||
result = decode_fixed_prediction_subframe(inp, type - 8, blocksize, sampledepth)
|
||||
elif 32 <= type <= 63:
|
||||
result = decode_linear_predictive_coding_subframe(inp, type - 31, blocksize, sampledepth)
|
||||
else:
|
||||
raise ValueError("Reserved subframe type")
|
||||
return [(v << shift) for v in result]
|
||||
|
||||
|
||||
def decode_fixed_prediction_subframe(inp, predorder, blocksize, sampledepth):
|
||||
result = [inp.read_signed_int(sampledepth) for _ in range(predorder)]
|
||||
decode_residuals(inp, blocksize, result)
|
||||
restore_linear_prediction(result, FIXED_PREDICTION_COEFFICIENTS[predorder], 0)
|
||||
return result
|
||||
|
||||
FIXED_PREDICTION_COEFFICIENTS = (
|
||||
(),
|
||||
(1,),
|
||||
(2, -1),
|
||||
(3, -3, 1),
|
||||
(4, -6, 4, -1),
|
||||
)
|
||||
|
||||
|
||||
def decode_linear_predictive_coding_subframe(inp, lpcorder, blocksize, sampledepth):
|
||||
result = [inp.read_signed_int(sampledepth) for _ in range(lpcorder)]
|
||||
precision = inp.read_uint(4) + 1
|
||||
shift = inp.read_signed_int(5)
|
||||
coefs = [inp.read_signed_int(precision) for _ in range(lpcorder)]
|
||||
decode_residuals(inp, blocksize, result)
|
||||
restore_linear_prediction(result, coefs, shift)
|
||||
return result
|
||||
|
||||
|
||||
def decode_residuals(inp, blocksize, result):
|
||||
method = inp.read_uint(2)
|
||||
if method >= 2:
|
||||
raise ValueError("Reserved residual coding method")
|
||||
parambits = [4, 5][method]
|
||||
escapeparam = [0xF, 0x1F][method]
|
||||
|
||||
partitionorder = inp.read_uint(4)
|
||||
numpartitions = 1 << partitionorder
|
||||
if blocksize % numpartitions != 0:
|
||||
raise ValueError("Block size not divisible by number of Rice partitions")
|
||||
|
||||
for i in range(numpartitions):
|
||||
count = blocksize >> partitionorder
|
||||
if i == 0:
|
||||
count -= len(result)
|
||||
param = inp.read_uint(parambits)
|
||||
if param < escapeparam:
|
||||
result.extend(inp.read_rice_signed_int(param) for _ in range(count))
|
||||
else:
|
||||
numbits = inp.read_uint(5)
|
||||
result.extend(inp.read_signed_int(numbits) for _ in range(count))
|
||||
|
||||
|
||||
def restore_linear_prediction(result, coefs, shift):
|
||||
for i in range(len(coefs), len(result)):
|
||||
result[i] += sum((result[i - 1 - j] * c) for (j, c) in enumerate(coefs)) >> shift
|
||||
|
||||
|
||||
|
||||
class BitInputStream(object):
|
||||
|
||||
def __init__(self, inp):
|
||||
self.inp = inp
|
||||
self.bitbuffer = 0
|
||||
self.bitbufferlen = 0
|
||||
|
||||
|
||||
def align_to_byte(self):
|
||||
self.bitbufferlen -= self.bitbufferlen % 8
|
||||
|
||||
|
||||
def read_byte(self):
|
||||
if self.bitbufferlen >= 8:
|
||||
return self.read_uint(8)
|
||||
else:
|
||||
result = self.inp.read(1)
|
||||
if len(result) == 0:
|
||||
return -1
|
||||
return result[0] if python3 else ord(result)
|
||||
|
||||
|
||||
def read_uint(self, n):
|
||||
while self.bitbufferlen < n:
|
||||
temp = self.inp.read(1)
|
||||
if len(temp) == 0:
|
||||
raise EOFError()
|
||||
temp = temp[0] if python3 else ord(temp)
|
||||
self.bitbuffer = (self.bitbuffer << 8) | temp
|
||||
self.bitbufferlen += 8
|
||||
self.bitbufferlen -= n
|
||||
result = (self.bitbuffer >> self.bitbufferlen) & ((1 << n) - 1)
|
||||
self.bitbuffer &= (1 << self.bitbufferlen) - 1
|
||||
return result
|
||||
|
||||
|
||||
def read_signed_int(self, n):
|
||||
temp = self.read_uint(n)
|
||||
temp -= (temp >> (n - 1)) << n
|
||||
return temp
|
||||
|
||||
|
||||
def read_rice_signed_int(self, param):
|
||||
val = 0
|
||||
while self.read_uint(1) == 0:
|
||||
val += 1
|
||||
val = (val << param) | self.read_uint(param)
|
||||
return (val >> 1) ^ -(val & 1)
|
||||
|
||||
|
||||
def close(self):
|
||||
self.inp.close()
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv)
|
||||
160
mqa_identifier_python/mqa_identifier.py
Normal file
160
mqa_identifier_python/mqa_identifier.py
Normal file
@@ -0,0 +1,160 @@
|
||||
import io
|
||||
import struct
|
||||
import wave
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# needed for correct import
|
||||
sys.path.append('/'.join(__file__.replace('\\', '/').split('/')[:-1]))
|
||||
import flac
|
||||
|
||||
|
||||
def twos_complement(n, bits):
|
||||
mask = 2 ** (bits - 1)
|
||||
return -(n & mask) + (n & ~mask)
|
||||
|
||||
|
||||
def iter_i24_as_i32(data):
|
||||
for l, h in struct.iter_unpack('<BH', data):
|
||||
yield twos_complement(h << 8 | l, 24) << 8
|
||||
|
||||
|
||||
def iter_i16_as_i32(data):
|
||||
for x, in struct.iter_unpack('<h', data):
|
||||
yield x << 16
|
||||
|
||||
|
||||
def peek(f, n):
|
||||
o = f.tell()
|
||||
r = f.read(n)
|
||||
f.seek(o)
|
||||
return r
|
||||
|
||||
|
||||
def original_sample_rate_decoder(c: int) -> int:
|
||||
"""
|
||||
Decodes from a 4 bit int the originalSampleRate:
|
||||
0: 44100Hz
|
||||
1: 48000Hz
|
||||
4: 176400Hz
|
||||
5: 192000Hz
|
||||
8: 88200Hz
|
||||
9: 96000Hz
|
||||
12: 352800Hz
|
||||
13: 384000Hz
|
||||
if LSB is 0 then base is 44100Hz else 48000Hz
|
||||
the first 3 MSBs need to be rotated and raised to the power of 2 (so 1, 2, 4, 8, ...)
|
||||
:param c: Is a 4 bit integer
|
||||
:return: The sample rate in Hz
|
||||
"""
|
||||
base = 48000 if (c & 1) == 1 else 44100
|
||||
# jesus @purpl3F0x
|
||||
multiplier = 1 << (((c >> 3) & 1) | (((c >> 2) & 1) << 1) | (((c >> 1) & 1) << 2))
|
||||
|
||||
return base * multiplier
|
||||
|
||||
|
||||
MAGIC = 51007556744 # int.from_bytes(bytes.fromhex('0be0498c88'), 'big') jesus christ
|
||||
|
||||
|
||||
class MqaIdentifier:
|
||||
def __init__(self, flac_file_path: Path):
|
||||
self.is_mqa = False
|
||||
self.is_mqa_studio = False
|
||||
self.original_sample_rate = None
|
||||
self.bit_depth = 16
|
||||
|
||||
self.detect(flac_file_path)
|
||||
|
||||
def get_original_sample_rate(self) -> float or int:
|
||||
"""
|
||||
Get the originalSampleRate in int or float depending on the frequency
|
||||
:return: sample rate in kHz
|
||||
"""
|
||||
sample_rate = self.original_sample_rate / 1000
|
||||
if sample_rate.is_integer():
|
||||
return int(sample_rate)
|
||||
return sample_rate
|
||||
|
||||
def _decode_flac_samples(self, flac_file_path) -> list:
|
||||
"""
|
||||
Decodes a 16/24bit flac file to a samples list
|
||||
|
||||
:param flac_file_path: Path to the flac file
|
||||
:return: Returns decoded samples in a list
|
||||
"""
|
||||
with open(str(flac_file_path), 'rb') as f:
|
||||
magic = peek(f, 4)
|
||||
|
||||
if magic == b'fLaC':
|
||||
with flac.BitInputStream(f) as bf:
|
||||
f = io.BytesIO()
|
||||
flac.decode_file(bf, f, seconds=1)
|
||||
f.seek(0)
|
||||
|
||||
with wave.open(f) as wf:
|
||||
channel_count, sample_width, framerate, *_ = wf.getparams()
|
||||
|
||||
if channel_count != 2:
|
||||
raise ValueError('Input must be stereo')
|
||||
|
||||
if sample_width == 3:
|
||||
iter_data = iter_i24_as_i32
|
||||
self.bit_depth = 24
|
||||
elif sample_width == 2:
|
||||
iter_data = iter_i16_as_i32
|
||||
else:
|
||||
raise ValueError('Input must be 16 or 24-bit')
|
||||
|
||||
return list(iter_data(wf.readframes(framerate)))
|
||||
|
||||
def detect(self, flac_file_path) -> bool:
|
||||
"""
|
||||
Detects if the FLAC file is a MQA file and also detects if it's MQA Studio (blue) and the originalSampleRate
|
||||
|
||||
:param flac_file_path: Path to the flac file
|
||||
:return: True if MQA got detected and False if not
|
||||
"""
|
||||
# get the samples from the FLAC decoder
|
||||
samples = self._decode_flac_samples(flac_file_path)
|
||||
# samples[::2] are left channel and samples[1::2] right channel samples
|
||||
channel_samples = list(zip(samples[::2], samples[1::2]))
|
||||
|
||||
# dictionary to save all the buffers for 16, 17 and 18 bit shifts
|
||||
buffer = {16: 0, 17: 0, 18: 0}
|
||||
for i, sample in enumerate(channel_samples):
|
||||
# sample[0] is the left channel sample and sample[1] the right channel sample
|
||||
# perform a XOR with both samples and bitshift it by 16, 17 and 18
|
||||
buffer = {key: value | (sample[0] ^ sample[1]) >> key & 1 for key, value in buffer.items()}
|
||||
|
||||
# int.from_bytes(bytes.fromhex('0be0498c88'), 'big')
|
||||
if MAGIC in buffer.values():
|
||||
# found MQA sync word
|
||||
self.is_mqa = True
|
||||
|
||||
# get the bitshift position where the MAGIC was found, ugly but works
|
||||
pos = [k for k, v in buffer.items() if v == MAGIC][0]
|
||||
|
||||
# get originalSampleRate
|
||||
org = 0
|
||||
for k in range(3, 7):
|
||||
j = ((channel_samples[i + k][0]) ^ (channel_samples[i + k][1])) >> pos & 1
|
||||
org |= j << (6 - k)
|
||||
|
||||
# decode the 4 bit int to the originalSampleRate
|
||||
self.original_sample_rate = original_sample_rate_decoder(org)
|
||||
|
||||
# get MQA Studio
|
||||
provenance = 0
|
||||
for k in range(29, 34):
|
||||
j = ((channel_samples[i + k][0]) ^ (channel_samples[i + k][1])) >> pos & 1
|
||||
provenance |= j << (33 - k)
|
||||
|
||||
# check if its MQA Studio (blue)
|
||||
self.is_mqa_studio = provenance > 8
|
||||
|
||||
return True
|
||||
else:
|
||||
buffer = {key: (value << 1) & 0xFFFFFFFFF for key, value in buffer.items()}
|
||||
|
||||
return False
|
||||
60
mqa_identifier_python/readme.md
Normal file
60
mqa_identifier_python/readme.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# MQA-identifier-python
|
||||
|
||||
An MQA (Studio, originalSampleRate) identifier for "lossless" flac files written in Python.
|
||||
|
||||
## About The Project
|
||||
|
||||
This project is a port of the awesome C++ project [MQA_identifier](https://github.com/purpl3F0x/MQA_identifier) by
|
||||
[@purpl3F0x](https://github.com/purpl3F0x) and [mqaid](https://github.com/redsudo/mqaid) by
|
||||
[@redsudo](https://github.com/redsudo).
|
||||
|
||||
## Getting Started
|
||||
|
||||
### Prerequisites
|
||||
|
||||
- [Python 3.6+](https://python.org/)
|
||||
|
||||
### Installation
|
||||
|
||||
1. Clone the repo
|
||||
|
||||
```sh
|
||||
git clone https://github.com/Dniel97/MQA-identifier-python.git && cd MQA-identifier-python
|
||||
```
|
||||
|
||||
2. Install the requirements
|
||||
|
||||
```sh
|
||||
pip3 install -r requirements.txt
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
```shell
|
||||
python3 mqa-identifier-python.py "path/to/flac/files"
|
||||
```
|
||||
|
||||
```
|
||||
Found 11 FLAC files to check
|
||||
# Encoding Name
|
||||
1 NOT MQA 22. letzter song.flac
|
||||
2 NOT MQA 23. judy.flac
|
||||
3 MQA Studio 96kHz 01. Algorithm.mqa.flac
|
||||
4 MQA Studio 48kHz 02. The Dark Side.mqa.flac
|
||||
5 MQA Studio 96kHz 03. Pressure.mqa.flac
|
||||
6 MQA Studio 48kHz 04. Propaganda.mqa.flac
|
||||
7 MQA Studio 96kHz 05. Break It to Me.mqa.flac
|
||||
8 MQA Studio 96kHz 06. Something Human.mqa.flac
|
||||
9 MQA Studio 96kHz 07. Thought Contagion.mqa.flac
|
||||
10 MQA Studio 96kHz 08. Get up and Fight.mqa.flac
|
||||
11 MQA Studio 44.1kHz 09. Blockades.mqa.flac
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
||||
Pull requests are welcome.
|
||||
|
||||
## Related Projects
|
||||
|
||||
- [MQA_identifier](https://github.com/purpl3F0x/MQA_identifier) (Core)
|
||||
- [mqaid](https://github.com/redsudo/mqaid)
|
||||
Reference in New Issue
Block a user