Spaces:
Runtime error
Runtime error
"""Extract segments from audio files based on BirdNET detections. | |
Can be used to save the segments of the audio files for each detection. | |
""" | |
import argparse | |
import os | |
from multiprocessing import Pool | |
import numpy as np | |
import audio | |
import config as cfg | |
import utils | |
# Set numpy random seed | |
np.random.seed(cfg.RANDOM_SEED) | |
def detectRType(line: str): | |
"""Detects the type of result file. | |
Args: | |
line: First line of text. | |
Returns: | |
Either "table", "r", "kaleidoscope", "csv" or "audacity". | |
""" | |
if line.lower().startswith("selection"): | |
return "table" | |
elif line.lower().startswith("filepath"): | |
return "r" | |
elif line.lower().startswith("indir"): | |
return "kaleidoscope" | |
elif line.lower().startswith("start (s)"): | |
return "csv" | |
else: | |
return "audacity" | |
def parseFolders(apath: str, rpath: str, allowed_result_filetypes: list[str] = ["txt", "csv"]) -> list[dict]: | |
"""Read audio and result files. | |
Reads all audio files and BirdNET output inside directory recursively. | |
Args: | |
apath: Path to search for audio files. | |
rpath: Path to search for result files. | |
allowed_result_filetypes: List of extensions for the result files. | |
Returns: | |
A list of {"audio": path_to_audio, "result": path_to_result }. | |
""" | |
data = {} | |
apath = apath.replace("/", os.sep).replace("\\", os.sep) | |
rpath = rpath.replace("/", os.sep).replace("\\", os.sep) | |
# Get all audio files | |
for root, _, files in os.walk(apath): | |
for f in files: | |
if f.rsplit(".", 1)[-1].lower() in cfg.ALLOWED_FILETYPES: | |
data[f.rsplit(".", 1)[0]] = {"audio": os.path.join(root, f), "result": ""} | |
# Get all result files | |
for root, _, files in os.walk(rpath): | |
for f in files: | |
if f.rsplit(".", 1)[-1] in allowed_result_filetypes and ".bat." in f: | |
data[f.split(".bat.", 1)[0]]["result"] = os.path.join(root, f) | |
# Convert to list | |
flist = [f for f in data.values() if f["result"]] | |
print(f"Found {len(flist)} audio files with valid result file.") | |
return flist | |
def parseFiles(flist: list[dict], max_segments=100): | |
"""Extracts the segments for all files. | |
Args: | |
flist: List of dict with {"audio": path_to_audio, "result": path_to_result }. | |
max_segments: Number of segments per species. | |
Returns: | |
TODO @kahst | |
""" | |
species_segments: dict[str, list] = {} | |
for f in flist: | |
# Paths | |
afile = f["audio"] | |
rfile = f["result"] | |
# Get all segments for result file | |
segments = findSegments(afile, rfile) | |
# Parse segments by species | |
for s in segments: | |
if s["species"] not in species_segments: | |
species_segments[s["species"]] = [] | |
species_segments[s["species"]].append(s) | |
# Shuffle segments for each species and limit to max_segments | |
for s in species_segments: | |
np.random.shuffle(species_segments[s]) | |
species_segments[s] = species_segments[s][:max_segments] | |
# Make dict of segments per audio file | |
segments: dict[str, list] = {} | |
seg_cnt = 0 | |
for s in species_segments: | |
for seg in species_segments[s]: | |
if seg["audio"] not in segments: | |
segments[seg["audio"]] = [] | |
segments[seg["audio"]].append(seg) | |
seg_cnt += 1 | |
print(f"Found {seg_cnt} segments in {len(segments)} audio files.") | |
# Convert to list | |
flist = [tuple(e) for e in segments.items()] | |
return flist | |
def findSegments(afile: str, rfile: str): | |
"""Extracts the segments for an audio file from the results file | |
Args: | |
afile: Path to the audio file. | |
rfile: Path to the result file. | |
Returns: | |
A list of dicts in the form of | |
{"audio": afile, "start": start, "end": end, "species": species, "confidence": confidence} | |
""" | |
segments: list[dict] = [] | |
# Open and parse result file | |
lines = utils.readLines(rfile) | |
# Auto-detect result type | |
rtype = detectRType(lines[0]) | |
# Get start and end times based on rtype | |
confidence = 0 | |
start = end = 0.0 | |
species = "" | |
for i, line in enumerate(lines): | |
if rtype == "table" and i > 0: | |
d = line.split("\t") | |
start = float(d[3]) | |
end = float(d[4]) | |
species = d[-2] | |
confidence = float(d[-1]) | |
elif rtype == "audacity": | |
d = line.split("\t") | |
start = float(d[0]) | |
end = float(d[1]) | |
species = d[2].split(", ")[1] | |
confidence = float(d[-1]) | |
elif rtype == "r" and i > 0: | |
d = line.split(",") | |
start = float(d[1]) | |
end = float(d[2]) | |
species = d[4] | |
confidence = float(d[5]) | |
elif rtype == "kaleidoscope" and i > 0: | |
d = line.split(",") | |
start = float(d[3]) | |
end = float(d[4]) + start | |
species = d[5] | |
confidence = float(d[7]) | |
elif rtype == "csv" and i > 0: | |
d = line.split(",") | |
start = float(d[0]) | |
end = float(d[1]) | |
species = d[3] | |
confidence = float(d[4]) | |
# Check if confidence is high enough | |
if confidence >= cfg.MIN_CONFIDENCE: | |
segments.append({"audio": afile, "start": start, "end": end, "species": species, "confidence": confidence}) | |
return segments | |
def extractSegments(item: tuple[tuple[str, list[dict]], float, dict[str]]): | |
"""Saves each segment separately. | |
Creates an audio file for each species segment. | |
Args: | |
item: A tuple that contains ((audio file path, segments), segment length, config) | |
""" | |
# Paths and config | |
afile = item[0][0] | |
segments = item[0][1] | |
seg_length = item[1] | |
cfg.set_config(item[2]) | |
# Status | |
print(f"Extracting segments from {afile}") | |
try: | |
# Open audio file | |
sig, _ = audio.openAudioFile(afile, cfg.SAMPLE_RATE) | |
except Exception as ex: | |
print(f"Error: Cannot open audio file {afile}", flush=True) | |
utils.writeErrorLog(ex) | |
return | |
# Extract segments | |
for seg_cnt, seg in enumerate(segments, 1): | |
try: | |
# Get start and end times | |
start = int(seg["start"] * cfg.SAMPLE_RATE) | |
end = int(seg["end"] * cfg.SAMPLE_RATE) | |
offset = ((seg_length * cfg.SAMPLE_RATE) - (end - start)) // 2 | |
start = max(0, start - offset) | |
end = min(len(sig), end + offset) | |
# Make sure segment is long enough | |
if end > start: | |
# Get segment raw audio from signal | |
seg_sig = sig[int(start) : int(end)] | |
# Make output path | |
outpath = os.path.join(cfg.OUTPUT_PATH, seg["species"]) | |
os.makedirs(outpath, exist_ok=True) | |
# Save segment | |
seg_name = "{:.3f}_{}_{}.wav".format( | |
seg["confidence"], seg_cnt, seg["audio"].rsplit(os.sep, 1)[-1].rsplit(".", 1)[0] | |
) | |
seg_path = os.path.join(outpath, seg_name) | |
audio.saveSignal(seg_sig, seg_path) | |
except Exception as ex: | |
# Write error log | |
print(f"Error: Cannot extract segments from {afile}.", flush=True) | |
utils.writeErrorLog(ex) | |
return False | |
return True | |
if __name__ == "__main__": | |
# Parse arguments | |
parser = argparse.ArgumentParser(description="Extract segments from audio files based on BirdNET detections.") | |
parser.add_argument("--audio", default="put-your-files-here/", help="Path to folder containing audio files.") | |
parser.add_argument("--results", default="put-your-files-here/results", help="Path to folder containing result files.") | |
parser.add_argument("--o", default="put-your-files-here/segments/", help="Output folder path for extracted segments.") | |
parser.add_argument( | |
"--min_conf", type=float, default=0.1, help="Minimum confidence threshold. Values in [0.01, 0.99]. Defaults to 0.1." | |
) | |
parser.add_argument("--max_segments", type=int, default=100, help="Number of randomly extracted segments per species.") | |
parser.add_argument( | |
"--seg_length", type=float, default=3.0, help="Length of extracted segments in seconds. Defaults to 3.0." | |
) | |
parser.add_argument("--threads", type=int, default=4, help="Number of CPU threads.") | |
args = parser.parse_args() | |
# Parse audio and result folders | |
cfg.FILE_LIST = parseFolders(args.audio, args.results) | |
# Set output folder | |
cfg.OUTPUT_PATH = args.o | |
# Set number of threads | |
cfg.CPU_THREADS = int(args.threads) | |
# Set confidence threshold | |
cfg.MIN_CONFIDENCE = max(0.01, min(0.99, float(args.min_conf))) | |
# Parse file list and make list of segments | |
cfg.FILE_LIST = parseFiles(cfg.FILE_LIST, max(1, int(args.max_segments))) | |
# Add config items to each file list entry. | |
# We have to do this for Windows which does not | |
# support fork() and thus each process has to | |
# have its own config. USE LINUX! | |
flist = [(entry, max(cfg.SIG_LENGTH, float(args.seg_length)), cfg.get_config()) for entry in cfg.FILE_LIST] | |
# Extract segments | |
if cfg.CPU_THREADS < 2: | |
for entry in flist: | |
extractSegments(entry) | |
else: | |
with Pool(cfg.CPU_THREADS) as p: | |
p.map(extractSegments, flist) | |
# A few examples to test | |
# python3 segments.py --audio example/ --results example/ --o example/segments/ | |
# python3 segments.py --audio example/ --results example/ --o example/segments/ --seg_length 5.0 --min_conf 0.1 --max_segments 100 --threads 4 | |