Spaces:
Sleeping
Sleeping
File size: 6,049 Bytes
96ea36d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import os
import numpy as np
import requests
import yaml
import pyloudnorm as pyln
from scipy.io.wavfile import write
import torchaudio
from retrying import retry
os.environ['OPENBLAS_NUM_THREADS'] = '1'
SAMPLE_RATE = 32000
with open('config.yaml', 'r') as file:
config = yaml.safe_load(file)
tts_port = config['Text-to-Speech']['service-port']
ttm_port = config['Text-to-Music']['service-port']
tta_port = config['Text-to-Audio']['service-port']
sr_port = config['Speech-Restoration']['service-port']
vp_port = config['Voice-Parser']['service-port']
enable_sr = config['Speech-Restoration']['Enable']
def IDLE(length=1.0, out_wav='out.wav', sr=SAMPLE_RATE):
idle = np.zeros(int(length * sr))
WRITE_AUDIO(idle, name=out_wav, sr=SAMPLE_RATE)
def LOUDNESS_NORM(audio, sr=32000, volumn=-25):
# peak normalize audio to -1 dB
peak_normalized_audio = pyln.normalize.peak(audio, -10.0)
# measure the loudness first
meter = pyln.Meter(sr) # create BS.1770 meter
loudness = meter.integrated_loudness(peak_normalized_audio)
# loudness normalize audio to -12 dB LUFS
normalized_audio = pyln.normalize.loudness(peak_normalized_audio, loudness, volumn)
return normalized_audio
def WRITE_AUDIO(wav, name=None, sr=SAMPLE_RATE):
"""
function: write audio numpy to .wav file
@params:
wav: np.array [samples]
"""
if name is None:
name = 'output.wav'
if len(wav.shape) > 1:
wav = wav[0]
# declipping
max_value = np.max(np.abs(wav))
if max_value > 1:
wav *= 0.9 / max_value
# print(f'WRITE_AUDIO to {name}')
write(name, sr, np.round(wav*32767).astype(np.int16))
def READ_AUDIO_NUMPY(wav, sr=SAMPLE_RATE):
"""
function: read audio numpy
return: np.array [samples]
"""
waveform, sample_rate = torchaudio.load(wav)
if sample_rate != sr:
waveform = torchaudio.functional.resample(waveform, orig_freq=sample_rate, new_freq=sr)
wav_numpy = waveform[0].numpy()
return wav_numpy
def MIX(wavs=[['1.wav', 0.], ['2.wav', 10.]], out_wav='out.wav', sr=SAMPLE_RATE):
"""
wavs:[[wav_name, absolute_offset], ...]
"""
# last_name, last_offset = wavs[-1]
# last_len = len(READ_AUDIO_NUMPY(last_name))
# max_length = int(last_offset * sr + last_len)
max_length = max([int(wav[1]*sr + len(READ_AUDIO_NUMPY(wav[0]))) for wav in wavs])
template_wav = np.zeros(max_length)
for wav in wavs:
cur_name, cur_offset = wav
cur_wav = READ_AUDIO_NUMPY(cur_name)
cur_len = len(cur_wav)
cur_offset = int(cur_offset * sr)
# mix
template_wav[cur_offset:cur_offset+cur_len] += cur_wav
WRITE_AUDIO(template_wav, name=out_wav)
def CAT(wavs, out_wav='out.wav'):
"""
wavs: List of wav file ['1.wav', '2.wav', ...]
"""
wav_num = len(wavs)
segment0 = READ_AUDIO_NUMPY(wavs[0])
cat_wav = segment0
if wav_num > 1:
for i in range(1, wav_num):
next_wav = READ_AUDIO_NUMPY(wavs[i])
cat_wav = np.concatenate((cat_wav, next_wav), axis=-1)
WRITE_AUDIO(cat_wav, name=out_wav)
def COMPUTE_LEN(wav):
wav= READ_AUDIO_NUMPY(wav)
return len(wav) / 32000
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTM(text, length=10, volume=-28, out_wav='out.wav'):
url = f'http://127.0.0.1:{ttm_port}/generate_music'
data = {
'text': f'{text}',
'length': f'{length}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTA(text, length=5, volume=-35, out_wav='out.wav'):
url = f'http://127.0.0.1:{tta_port}/generate_audio'
data = {
'text': f'{text}',
'length': f'{length}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def TTS(text, speaker='news_anchor', volume=-20, out_wav='out.wav', enhanced=enable_sr, speaker_id='', speaker_npz=''):
url = f'http://127.0.0.1:{tts_port}/generate_speech'
data = {
'text': f'{text}',
'speaker_id': f'{speaker_id}',
'speaker_npz': f'{speaker_npz}',
'volume': f'{volume}',
'output_wav': f'{out_wav}',
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
if enhanced:
SR(processfile=out_wav)
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def SR(processfile):
url = f'http://127.0.0.1:{sr_port}/fix_audio'
data = {'processfile': f'{processfile}'}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
@retry(stop_max_attempt_number=5, wait_fixed=2000)
def VP(wav_path, out_dir):
url = f'http://127.0.0.1:{vp_port}/parse_voice'
data = {
'wav_path': f'{wav_path}',
'out_dir':f'{out_dir}'
}
response = requests.post(url, json=data)
if response.status_code == 200:
print('Success:', response.json()['message'])
else:
print('Error:', response.json()['API error'])
raise RuntimeError(response.json()['API error'])
|