WavJourney / APIs.py
zzk1st's picture
Copy code from github
96ea36d
raw
history blame
6.05 kB
import os
import numpy as np
import requests
import yaml
import pyloudnorm as pyln
from scipy.io.wavfile import write
import torchaudio
from retrying import retry
os.environ['OPENBLAS_NUM_THREADS'] = '1'
SAMPLE_RATE = 32000
with open('config.yaml', 'r') as file:
config = yaml.safe_load(file)
tts_port = config['Text-to-Speech']['service-port']
ttm_port = config['Text-to-Music']['service-port']
tta_port = config['Text-to-Audio']['service-port']
sr_port = config['Speech-Restoration']['service-port']
vp_port = config['Voice-Parser']['service-port']
enable_sr = config['Speech-Restoration']['Enable']
def IDLE(length=1.0, out_wav='out.wav', sr=SAMPLE_RATE):
idle = np.zeros(int(length * sr))
WRITE_AUDIO(idle, name=out_wav, sr=SAMPLE_RATE)
def LOUDNESS_NORM(audio, sr=32000, volumn=-25):
# peak normalize audio to -1 dB
peak_normalized_audio = pyln.normalize.peak(audio, -10.0)
# measure the loudness first
meter = pyln.Meter(sr) # create BS.1770 meter
loudness = meter.integrated_loudness(peak_normalized_audio)
# loudness normalize audio to -12 dB LUFS
normalized_audio = pyln.normalize.loudness(peak_normalized_audio, loudness, volumn)
return normalized_audio
def WRITE_AUDIO(wav, name=None, sr=SAMPLE_RATE):
"""
function: write audio numpy to .wav file
@params:
wav: np.array [samples]
"""
if name is None:
name = 'output.wav'
if len(wav.shape) > 1:
wav = wav[0]
# declipping
max_value = np.max(np.abs(wav))
if max_value > 1:
wav *= 0.9 / max_value
# print(f'WRITE_AUDIO to {name}')
write(name, sr, np.round(wav*32767).astype(np.int16))
def READ_AUDIO_NUMPY(wav, sr=SAMPLE_RATE):
"""
function: read audio numpy
return: np.array [samples]
"""
waveform, sample_rate = torchaudio.load(wav)
if sample_rate != sr:
waveform = torchaudio.functional.resample(waveform, orig_freq=sample_rate, new_freq=sr)
wav_numpy = waveform[0].numpy()
return wav_numpy
def MIX(wavs=[['1.wav', 0.], ['2.wav', 10.]], out_wav='out.wav', sr=SAMPLE_RATE):
"""
wavs:[[wav_name, absolute_offset], ...]
"""
# last_name, last_offset = wavs[-1]
# last_len = len(READ_AUDIO_NUMPY(last_name))
# max_length = int(last_offset * sr + last_len)
max_length = max([int(wav[1]*sr + len(READ_AUDIO_NUMPY(wav[0]))) for wav in wavs])
template_wav = np.zeros(max_length)
for wav in wavs:
cur_name, cur_offset = wav
cur_wav = READ_AUDIO_NUMPY(cur_name)
cur_len = len(cur_wav)
cur_offset = int(cur_offset * sr)
# mix
template_wav[cur_offset:cur_offset+cur_len] += cur_wav
WRITE_AUDIO(template_wav, name=out_wav)
def CAT(wavs, out_wav='out.wav'):
"""
wavs: List of wav file ['1.wav', '2.wav', ...]
"""
wav_num = len(wavs)
segment0 = READ_AUDIO_NUMPY(wavs[0])
cat_wav = segment0
if wav_num > 1:
for i in range(1, wav_num):
next_wav = READ_AUDIO_NUMPY(wavs[i])
cat_wav = np.concatenate((cat_wav, next_wav), axis=-1)
WRITE_AUDIO(cat_wav, name=out_wav)
def COMPUTE_LEN(wav):
wav= READ_AUDIO_NUMPY(wav)
return len(wav) / 32000
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTM(text, length=10, volume=-28, out_wav='out.wav'):
url = f'http://127.0.0.1:{ttm_port}/generate_music'
data = {
'text': f'{text}',
'length': f'{length}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTA(text, length=5, volume=-35, out_wav='out.wav'):
url = f'http://127.0.0.1:{tta_port}/generate_audio'
data = {
'text': f'{text}',
'length': f'{length}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTS(text, speaker='news_anchor', volume=-20, out_wav='out.wav', enhanced=enable_sr, speaker_id='', speaker_npz=''):
url = f'http://127.0.0.1:{tts_port}/generate_speech'
data = {
'text': f'{text}',
'speaker_id': f'{speaker_id}',
'speaker_npz': f'{speaker_npz}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
if enhanced:
SR(processfile=out_wav)
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def SR(processfile):
url = f'http://127.0.0.1:{sr_port}/fix_audio'
data = {'processfile': f'{processfile}'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def VP(wav_path, out_dir):
url = f'http://127.0.0.1:{vp_port}/parse_voice'
data = {
'wav_path': f'{wav_path}',
'out_dir':f'{out_dir}'
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])