File size: 3,781 Bytes
7ee3434 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
""" This code is modified from https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/performance.html"""
import os
import subprocess
from multiprocessing import Pool
from tqdm import tqdm
import torchaudio
from pathlib import Path
def remove_empty_dirs(path):
"""remove empty directories in a given path"""
# Check if the given path is a directory
if not os.path.isdir(path):
print(f"{path} is not a directory")
return
# Walk through all directories and subdirectories
for root, dirs, _ in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
# Check if the directory is empty
if not os.listdir(dir_path):
os.rmdir(dir_path) # "Removed empty directory
def process_single_wav_file(task):
"""process a single wav file"""
wav_file, output_dir = task
speaker_id, book_name, filename = Path(wav_file).parts[-3:]
output_book_dir = Path(output_dir, speaker_id)
output_book_dir.mkdir(parents=True, exist_ok=True)
new_filename = f"{speaker_id}_{book_name}_{filename}"
new_wav_file = Path(output_book_dir, new_filename)
command = [
"ffmpeg",
"-nostdin",
"-hide_banner",
"-loglevel",
"error",
"-nostats",
"-i",
wav_file,
"-acodec",
"pcm_s16le",
"-ar",
"16000",
new_wav_file,
]
subprocess.check_call(
command
) # Run the command to convert the file to 16kHz and 16-bit PCM
os.remove(wav_file)
def process_wav_files(wav_files, output_dir, n_process):
"""process wav files in parallel"""
tasks = [(wav_file, output_dir) for wav_file in wav_files]
print(f"Processing {len(tasks)} files")
with Pool(processes=n_process) as pool:
for _ in tqdm(
pool.imap_unordered(process_single_wav_file, tasks), total=len(tasks)
):
pass
print("Removing empty directories...")
remove_empty_dirs(output_dir)
print("Done!")
def get_wav_files(dataset_path):
"""get all wav files in the dataset"""
wav_files = []
for speaker_id in os.listdir(dataset_path):
speaker_dir = os.path.join(dataset_path, speaker_id)
if not os.path.isdir(speaker_dir):
continue
for book_name in os.listdir(speaker_dir):
book_dir = os.path.join(speaker_dir, book_name)
if not os.path.isdir(book_dir):
continue
for file in os.listdir(book_dir):
if file.endswith(".wav"):
wav_files.append(os.path.join(book_dir, file))
print("Found {} wav files".format(len(wav_files)))
return wav_files
def filter_wav_files_by_length(wav_files, max_len_sec=15):
"""filter wav files by length"""
print("original wav files: {}".format(len(wav_files)))
filtered_wav_files = []
for audio_file in wav_files:
metadata = torchaudio.info(str(audio_file))
audio_length = metadata.num_frames / metadata.sample_rate
if audio_length <= max_len_sec:
filtered_wav_files.append(audio_file)
else:
os.remove(audio_file)
print("filtered wav files: {}".format(len(filtered_wav_files)))
return filtered_wav_files
if __name__ == "__main__":
dataset_path = "/path/to/output/directory"
n_process = 16
max_len_sec = 15
wav_files = get_wav_files(dataset_path)
filtered_wav_files = filter_wav_files_by_length(wav_files, max_len_sec)
process_wav_files(filtered_wav_files, dataset_path, n_process)
|