Added mqa_identifier_python as a submodule

This commit is contained in:
Dniel97
2022-07-25 13:47:47 +02:00
parent 502be429c0
commit 00a69f711a
7 changed files with 7 additions and 532 deletions

4
.gitmodules vendored Normal file
View File

@@ -0,0 +1,4 @@
[submodule "mqa_identifier_python"]
path = mqa_identifier_python
url = git@github.com:Dniel97/MQA-identifier-python.git
branch = master

View File

@@ -13,7 +13,7 @@ from tqdm import tqdm
from utils.models import *
from utils.utils import sanitise_name, silentremove, download_to_temp, create_temp_filename, create_requests_session
from .mqa_identifier_python.mqa_identifier import MqaIdentifier
from .mqa_identifier_python.mqa_identifier_python.mqa_identifier import MqaIdentifier
from .tidal_api import TidalTvSession, TidalApi, TidalMobileSession, SessionType, TidalError, TidalRequestError
module_information = ModuleInformation(
@@ -505,6 +505,7 @@ class ModuleInterface:
bit_depth=bit_depth,
sample_rate=sample_rate,
bitrate=bitrate,
# duration=track_data.get('duration'),
cover_url=self.generate_artwork_url(track_data['album'].get('cover'),
size=self.cover_size) if track_data['album'].get('cover') else None,
explicit=track_data.get('explicit'),

1
mqa_identifier_python Submodule

Submodule mqa_identifier_python added at ff0c9f1824

View File

@@ -1,307 +0,0 @@
#
# Simple FLAC decoder (Python)
#
# Copyright (c) 2017 Project Nayuki. (MIT License)
# https://www.nayuki.io/page/simple-flac-implementation
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
# - The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# - The Software is provided "as is", without warranty of any kind, express or
# implied, including but not limited to the warranties of merchantability,
# fitness for a particular purpose and noninfringement. In no event shall the
# authors or copyright holders be liable for any claim, damages or other
# liability, whether in an action of contract, tort or otherwise, arising from,
# out of or in connection with the Software or the use or other dealings in the
# Software.
#
import struct, sys
python3 = sys.version_info.major >= 3
def main(argv):
if len(argv) != 3:
sys.exit("Usage: python " + argv[0] + " InFile.flac OutFile.wav")
with BitInputStream(open(argv[1], "rb")) as inp:
with open(argv[2], "wb") as out:
decode_file(inp, out)
def decode_file(inp, out, numsamples=None, seconds=None):
# Handle FLAC header and metadata blocks
if inp.read_uint(32) != 0x664C6143:
raise ValueError("Invalid magic string")
samplerate = None
last = False
while not last:
last = inp.read_uint(1) != 0
type = inp.read_uint(7)
length = inp.read_uint(24)
if type == 0: # Stream info block
inp.read_uint(16)
inp.read_uint(16)
inp.read_uint(24)
inp.read_uint(24)
samplerate = inp.read_uint(20)
if seconds:
numsamples = seconds * samplerate
numchannels = inp.read_uint(3) + 1
sampledepth = inp.read_uint(5) + 1
x = inp.read_uint(36)
numsamples = numsamples or x
inp.read_uint(128)
else:
for i in range(length):
inp.read_uint(8)
if samplerate is None:
raise ValueError("Stream info metadata block absent")
if sampledepth % 8 != 0:
raise RuntimeError("Sample depth not supported")
# Start writing WAV file headers
sampledatalen = numsamples * numchannels * (sampledepth // 8)
out.write(b"RIFF")
out.write(struct.pack("<I", sampledatalen + 36))
out.write(b"WAVE")
out.write(b"fmt ")
out.write(struct.pack("<IHHIIHH", 16, 0x0001, numchannels, samplerate,
samplerate * numchannels * (sampledepth // 8), numchannels * (sampledepth // 8), sampledepth))
out.write(b"data")
out.write(struct.pack("<I", sampledatalen))
# Decode FLAC audio frames and write raw samples
while numsamples > 0:
numsamples -= decode_frame(inp, numchannels, sampledepth, out)
def decode_frame(inp, numchannels, sampledepth, out):
# Read a ton of header fields, and ignore most of them
temp = inp.read_byte()
if temp == -1:
return False
sync = temp << 6 | inp.read_uint(6)
if sync != 0x3FFE:
raise ValueError("Sync code expected")
inp.read_uint(1)
inp.read_uint(1)
blocksizecode = inp.read_uint(4)
sampleratecode = inp.read_uint(4)
chanasgn = inp.read_uint(4)
inp.read_uint(3)
inp.read_uint(1)
temp = inp.read_uint(8)
while temp >= 0b11000000:
inp.read_uint(8)
temp = (temp << 1) & 0xFF
if blocksizecode == 1:
blocksize = 192
elif 2 <= blocksizecode <= 5:
blocksize = 576 << blocksizecode - 2
elif blocksizecode == 6:
blocksize = inp.read_uint(8) + 1
elif blocksizecode == 7:
blocksize = inp.read_uint(16) + 1
elif 8 <= blocksizecode <= 15:
blocksize = 256 << (blocksizecode - 8)
if sampleratecode == 12:
inp.read_uint(8)
elif sampleratecode in (13, 14):
inp.read_uint(16)
inp.read_uint(8)
# Decode each channel's subframe, then skip footer
samples = decode_subframes(inp, blocksize, sampledepth, chanasgn)
inp.align_to_byte()
inp.read_uint(16)
# Write the decoded samples
numbytes = sampledepth // 8
if python3:
def write_little_int(val):
out.write(bytes(((val >> (i * 8)) & 0xFF) for i in range(numbytes)))
else:
def write_little_int(val):
out.write("".join(chr((val >> (i * 8)) & 0xFF) for i in range(numbytes)))
addend = 128 if sampledepth == 8 else 0
for i in range(blocksize):
for j in range(numchannels):
write_little_int(samples[j][i] + addend)
return blocksize
def decode_subframes(inp, blocksize, sampledepth, chanasgn):
if 0 <= chanasgn <= 7:
return [decode_subframe(inp, blocksize, sampledepth) for _ in range(chanasgn + 1)]
elif 8 <= chanasgn <= 10:
temp0 = decode_subframe(inp, blocksize, sampledepth + (1 if (chanasgn == 9) else 0))
temp1 = decode_subframe(inp, blocksize, sampledepth + (0 if (chanasgn == 9) else 1))
if chanasgn == 8:
for i in range(blocksize):
temp1[i] = temp0[i] - temp1[i]
elif chanasgn == 9:
for i in range(blocksize):
temp0[i] += temp1[i]
elif chanasgn == 10:
for i in range(blocksize):
side = temp1[i]
right = temp0[i] - (side >> 1)
temp1[i] = right
temp0[i] = right + side
return [temp0, temp1]
else:
raise ValueError("Reserved channel assignment")
def decode_subframe(inp, blocksize, sampledepth):
inp.read_uint(1)
type = inp.read_uint(6)
shift = inp.read_uint(1)
if shift == 1:
while inp.read_uint(1) == 0:
shift += 1
sampledepth -= shift
if type == 0: # Constant coding
result = [inp.read_signed_int(sampledepth)] * blocksize
elif type == 1: # Verbatim coding
result = [inp.read_signed_int(sampledepth) for _ in range(blocksize)]
elif 8 <= type <= 12:
result = decode_fixed_prediction_subframe(inp, type - 8, blocksize, sampledepth)
elif 32 <= type <= 63:
result = decode_linear_predictive_coding_subframe(inp, type - 31, blocksize, sampledepth)
else:
raise ValueError("Reserved subframe type")
return [(v << shift) for v in result]
def decode_fixed_prediction_subframe(inp, predorder, blocksize, sampledepth):
result = [inp.read_signed_int(sampledepth) for _ in range(predorder)]
decode_residuals(inp, blocksize, result)
restore_linear_prediction(result, FIXED_PREDICTION_COEFFICIENTS[predorder], 0)
return result
FIXED_PREDICTION_COEFFICIENTS = (
(),
(1,),
(2, -1),
(3, -3, 1),
(4, -6, 4, -1),
)
def decode_linear_predictive_coding_subframe(inp, lpcorder, blocksize, sampledepth):
result = [inp.read_signed_int(sampledepth) for _ in range(lpcorder)]
precision = inp.read_uint(4) + 1
shift = inp.read_signed_int(5)
coefs = [inp.read_signed_int(precision) for _ in range(lpcorder)]
decode_residuals(inp, blocksize, result)
restore_linear_prediction(result, coefs, shift)
return result
def decode_residuals(inp, blocksize, result):
method = inp.read_uint(2)
if method >= 2:
raise ValueError("Reserved residual coding method")
parambits = [4, 5][method]
escapeparam = [0xF, 0x1F][method]
partitionorder = inp.read_uint(4)
numpartitions = 1 << partitionorder
if blocksize % numpartitions != 0:
raise ValueError("Block size not divisible by number of Rice partitions")
for i in range(numpartitions):
count = blocksize >> partitionorder
if i == 0:
count -= len(result)
param = inp.read_uint(parambits)
if param < escapeparam:
result.extend(inp.read_rice_signed_int(param) for _ in range(count))
else:
numbits = inp.read_uint(5)
result.extend(inp.read_signed_int(numbits) for _ in range(count))
def restore_linear_prediction(result, coefs, shift):
for i in range(len(coefs), len(result)):
result[i] += sum((result[i - 1 - j] * c) for (j, c) in enumerate(coefs)) >> shift
class BitInputStream(object):
def __init__(self, inp):
self.inp = inp
self.bitbuffer = 0
self.bitbufferlen = 0
def align_to_byte(self):
self.bitbufferlen -= self.bitbufferlen % 8
def read_byte(self):
if self.bitbufferlen >= 8:
return self.read_uint(8)
else:
result = self.inp.read(1)
if len(result) == 0:
return -1
return result[0] if python3 else ord(result)
def read_uint(self, n):
while self.bitbufferlen < n:
temp = self.inp.read(1)
if len(temp) == 0:
raise EOFError()
temp = temp[0] if python3 else ord(temp)
self.bitbuffer = (self.bitbuffer << 8) | temp
self.bitbufferlen += 8
self.bitbufferlen -= n
result = (self.bitbuffer >> self.bitbufferlen) & ((1 << n) - 1)
self.bitbuffer &= (1 << self.bitbufferlen) - 1
return result
def read_signed_int(self, n):
temp = self.read_uint(n)
temp -= (temp >> (n - 1)) << n
return temp
def read_rice_signed_int(self, param):
val = 0
while self.read_uint(1) == 0:
val += 1
val = (val << param) | self.read_uint(param)
return (val >> 1) ^ -(val & 1)
def close(self):
self.inp.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if __name__ == "__main__":
main(sys.argv)

View File

@@ -1,164 +0,0 @@
import io
import struct
import wave
import sys
from pathlib import Path
# needed for correct import
sys.path.append('/'.join(__file__.replace('\\', '/').split('/')[:-1]))
import flac
def twos_complement(n, bits):
mask = 2 ** (bits - 1)
return -(n & mask) + (n & ~mask)
def iter_i24_as_i32(data):
for l, h in struct.iter_unpack('<BH', data):
yield twos_complement(h << 8 | l, 24) << 8
def iter_i16_as_i32(data):
for x, in struct.iter_unpack('<h', data):
yield x << 16
def peek(f, n):
o = f.tell()
r = f.read(n)
f.seek(o)
return r
def original_sample_rate_decoder(c: int) -> int:
"""
Decodes from a 4 bit int the originalSampleRate:
0: 44100Hz
1: 48000Hz
4: 176400Hz
5: 192000Hz
8: 88200Hz
9: 96000Hz
12: 352800Hz
13: 384000Hz
if LSB is 0 then base is 44100Hz else 48000Hz
the first 3 MSBs need to be rotated and raised to the power of 2 (so 1, 2, 4, 8, ...)
:param c: Is a 4 bit integer
:return: The sample rate in Hz
"""
base = 48000 if (c & 1) == 1 else 44100
# jesus @purpl3F0x
multiplier = 1 << (((c >> 3) & 1) | (((c >> 2) & 1) << 1) | (((c >> 1) & 1) << 2))
return base * multiplier
MAGIC = 51007556744 # int.from_bytes(bytes.fromhex('0be0498c88'), 'big') jesus christ
class MqaIdentifier:
def __init__(self, flac_file_path: str or Path):
self.is_mqa = False
self.is_mqa_studio = False
self.original_sample_rate = None
self.bit_depth = 16
self.detect(flac_file_path)
def get_original_sample_rate(self) -> float or int:
"""
Get the originalSampleRate in int or float depending on the frequency
:return: sample rate in kHz
"""
sample_rate = self.original_sample_rate / 1000
if sample_rate.is_integer():
return int(sample_rate)
return sample_rate
def _decode_flac_samples(self, flac_file_path: str or Path) -> list:
"""
Decodes a 16/24bit flac file to a samples list
:param flac_file_path: Path to the flac file
:return: Returns decoded samples in a list
"""
with open(str(flac_file_path), 'rb') as f:
magic = peek(f, 4)
if magic == b'fLaC':
with flac.BitInputStream(f) as bf:
f = io.BytesIO()
# ignore EOFError
try:
flac.decode_file(bf, f, seconds=1)
except EOFError:
pass
f.seek(0)
with wave.open(f) as wf:
channel_count, sample_width, framerate, *_ = wf.getparams()
if channel_count != 2:
raise ValueError('Input must be stereo')
if sample_width == 3:
iter_data = iter_i24_as_i32
self.bit_depth = 24
elif sample_width == 2:
iter_data = iter_i16_as_i32
else:
raise ValueError('Input must be 16 or 24-bit')
return list(iter_data(wf.readframes(framerate)))
def detect(self, flac_file_path: str or Path) -> bool:
"""
Detects if the FLAC file is a MQA file and also detects if it's MQA Studio (blue) and the originalSampleRate
:param flac_file_path: Path to the flac file
:return: True if MQA got detected and False if not
"""
# get the samples from the FLAC decoder
samples = self._decode_flac_samples(flac_file_path)
# samples[::2] are left channel and samples[1::2] right channel samples
channel_samples = list(zip(samples[::2], samples[1::2]))
# dictionary to save all the buffers for 16, 17 and 18 bit shifts
buffer = {16: 0, 17: 0, 18: 0}
for i, sample in enumerate(channel_samples):
# sample[0] is the left channel sample and sample[1] the right channel sample
# perform a XOR with both samples and bitshift it by 16, 17 and 18
buffer = {key: value | (sample[0] ^ sample[1]) >> key & 1 for key, value in buffer.items()}
# int.from_bytes(bytes.fromhex('0be0498c88'), 'big')
if MAGIC in buffer.values():
# found MQA sync word
self.is_mqa = True
# get the bitshift position where the MAGIC was found, ugly but works
pos = [k for k, v in buffer.items() if v == MAGIC][0]
# get originalSampleRate
org = 0
for k in range(3, 7):
j = ((channel_samples[i + k][0]) ^ (channel_samples[i + k][1])) >> pos & 1
org |= j << (6 - k)
# decode the 4 bit int to the originalSampleRate
self.original_sample_rate = original_sample_rate_decoder(org)
# get MQA Studio
provenance = 0
for k in range(29, 34):
j = ((channel_samples[i + k][0]) ^ (channel_samples[i + k][1])) >> pos & 1
provenance |= j << (33 - k)
# check if its MQA Studio (blue)
self.is_mqa_studio = provenance > 8
return True
else:
buffer = {key: (value << 1) & 0xFFFFFFFFF for key, value in buffer.items()}
return False

View File

@@ -1,60 +0,0 @@
# MQA-identifier-python
An MQA (Studio, originalSampleRate) identifier for "lossless" flac files written in Python.
## About The Project
This project is a port of the awesome C++ project [MQA_identifier](https://github.com/purpl3F0x/MQA_identifier) by
[@purpl3F0x](https://github.com/purpl3F0x) and [mqaid](https://github.com/redsudo/mqaid) by
[@redsudo](https://github.com/redsudo).
## Getting Started
### Prerequisites
- [Python 3.6+](https://python.org/)
### Installation
1. Clone the repo
```sh
git clone https://github.com/Dniel97/MQA-identifier-python.git && cd MQA-identifier-python
```
2. Install the requirements
```sh
pip3 install -r requirements.txt
```
## Usage
```shell
python3 mqa-identifier-python.py "path/to/flac/files"
```
```
Found 11 FLAC files to check
# Encoding Name
1 NOT MQA 22. letzter song.flac
2 NOT MQA 23. judy.flac
3 MQA Studio 96kHz 01. Algorithm.mqa.flac
4 MQA Studio 48kHz 02. The Dark Side.mqa.flac
5 MQA Studio 96kHz 03. Pressure.mqa.flac
6 MQA Studio 48kHz 04. Propaganda.mqa.flac
7 MQA Studio 96kHz 05. Break It to Me.mqa.flac
8 MQA Studio 96kHz 06. Something Human.mqa.flac
9 MQA Studio 96kHz 07. Thought Contagion.mqa.flac
10 MQA Studio 96kHz 08. Get up and Fight.mqa.flac
11 MQA Studio 44.1kHz 09. Blockades.mqa.flac
```
## Contributing
Pull requests are welcome.
## Related Projects
- [MQA_identifier](https://github.com/purpl3F0x/MQA_identifier) (Core)
- [mqaid](https://github.com/redsudo/mqaid)