Spaces:
Runtime error
Runtime error
"""Contains functions to use the BirdNET models. | |
""" | |
import os | |
import warnings | |
import numpy as np | |
import config as cfg | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
warnings.filterwarnings("ignore") | |
# Import TFLite from runtime or Tensorflow; | |
# import Keras if protobuf model; | |
# NOTE: we have to use TFLite if we want to use | |
# the metadata model or want to extract embeddings | |
try: | |
import tflite_runtime.interpreter as tflite | |
except ModuleNotFoundError: | |
from tensorflow import lite as tflite | |
if not cfg.MODEL_PATH.endswith(".tflite"): | |
from tensorflow import keras | |
INTERPRETER: tflite.Interpreter = None | |
C_INTERPRETER: tflite.Interpreter = None | |
M_INTERPRETER: tflite.Interpreter = None | |
PBMODEL = None | |
def loadModel(class_output=True): | |
"""Initializes the BirdNET Model. | |
Args: | |
class_output: Omits the last layer when False. | |
""" | |
global PBMODEL | |
global INTERPRETER | |
global INPUT_LAYER_INDEX | |
global OUTPUT_LAYER_INDEX | |
# Do we have to load the tflite or protobuf model? | |
if cfg.MODEL_PATH.endswith(".tflite"): | |
# Load TFLite model and allocate tensors. | |
INTERPRETER = tflite.Interpreter(model_path=cfg.MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
INTERPRETER.allocate_tensors() | |
# Get input and output tensors. | |
input_details = INTERPRETER.get_input_details() | |
output_details = INTERPRETER.get_output_details() | |
# Get input tensor index | |
INPUT_LAYER_INDEX = input_details[0]["index"] | |
# Get classification output or feature embeddings | |
if class_output: | |
OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
else: | |
OUTPUT_LAYER_INDEX = output_details[0]["index"] - 1 | |
else: | |
# Load protobuf model | |
# Note: This will throw a bunch of warnings about custom gradients | |
# which we will ignore until TF lets us block them | |
PBMODEL = keras.models.load_model(cfg.MODEL_PATH, compile=False) | |
def loadCustomClassifier(): | |
"""Loads the custom classifier.""" | |
global C_INTERPRETER | |
global C_INPUT_LAYER_INDEX | |
global C_OUTPUT_LAYER_INDEX | |
# Load TFLite model and allocate tensors. | |
C_INTERPRETER = tflite.Interpreter(model_path=cfg.CUSTOM_CLASSIFIER, num_threads=cfg.TFLITE_THREADS) | |
C_INTERPRETER.allocate_tensors() | |
# Get input and output tensors. | |
input_details = C_INTERPRETER.get_input_details() | |
output_details = C_INTERPRETER.get_output_details() | |
# Get input tensor index | |
C_INPUT_LAYER_INDEX = input_details[0]["index"] | |
# Get classification output | |
C_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
def loadMetaModel(): | |
"""Loads the model for species prediction. | |
Initializes the model used to predict species list, based on coordinates and week of year. | |
""" | |
global M_INTERPRETER | |
global M_INPUT_LAYER_INDEX | |
global M_OUTPUT_LAYER_INDEX | |
# Load TFLite model and allocate tensors. | |
M_INTERPRETER = tflite.Interpreter(model_path=cfg.MDATA_MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
M_INTERPRETER.allocate_tensors() | |
# Get input and output tensors. | |
input_details = M_INTERPRETER.get_input_details() | |
output_details = M_INTERPRETER.get_output_details() | |
# Get input tensor index | |
M_INPUT_LAYER_INDEX = input_details[0]["index"] | |
M_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
def buildLinearClassifier(num_labels, input_size, hidden_units=0): | |
"""Builds a classifier. | |
Args: | |
num_labels: Output size. | |
input_size: Size of the input. | |
hidden_units: If > 0, creates another hidden layer with the given number of units. | |
Returns: | |
A new classifier. | |
""" | |
# import keras | |
from tensorflow import keras | |
# Build a simple one- or two-layer linear classifier | |
model = keras.Sequential() | |
# Input layer | |
model.add(keras.layers.InputLayer(input_shape=(input_size,))) | |
# Hidden layer | |
if hidden_units > 0: | |
model.add(keras.layers.Dense(hidden_units, activation="relu")) | |
# Classification layer | |
model.add(keras.layers.Dense(num_labels)) | |
# Activation layer | |
model.add(keras.layers.Activation("sigmoid")) | |
return model | |
def trainLinearClassifier(classifier, x_train, y_train, epochs, batch_size, learning_rate, on_epoch_end=None): | |
"""Trains a custom classifier. | |
Trains a new classifier for BirdNET based on the given data. | |
Args: | |
classifier: The classifier to be trained. | |
x_train: Samples. | |
y_train: Labels. | |
epochs: Number of epochs to train. | |
batch_size: Batch size. | |
learning_rate: The learning rate during training. | |
on_epoch_end: Optional callback `function(epoch, logs)`. | |
Returns: | |
(classifier, history) | |
""" | |
# import keras | |
from tensorflow import keras | |
class FunctionCallback(keras.callbacks.Callback): | |
def __init__(self, on_epoch_end=None) -> None: | |
super().__init__() | |
self.on_epoch_end_fn = on_epoch_end | |
def on_epoch_end(self, epoch, logs=None): | |
if self.on_epoch_end_fn: | |
self.on_epoch_end_fn(epoch, logs) | |
# Set random seed | |
np.random.seed(cfg.RANDOM_SEED) | |
# Shuffle data | |
idx = np.arange(x_train.shape[0]) | |
np.random.shuffle(idx) | |
x_train = x_train[idx] | |
y_train = y_train[idx] | |
# Random val split | |
x_val = x_train[int(0.8 * x_train.shape[0]) :] | |
y_val = y_train[int(0.8 * y_train.shape[0]) :] | |
# Early stopping | |
callbacks = [ | |
keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True), | |
FunctionCallback(on_epoch_end=on_epoch_end), | |
] | |
# Cosine annealing lr schedule | |
lr_schedule = keras.experimental.CosineDecay(learning_rate, epochs * x_train.shape[0] / batch_size) | |
# Compile model | |
classifier.compile( | |
optimizer=keras.optimizers.Adam(learning_rate=lr_schedule), | |
loss="binary_crossentropy", | |
metrics=keras.metrics.Precision(top_k=1, name="prec"), | |
) | |
# Train model | |
history = classifier.fit( | |
x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val, y_val), callbacks=callbacks | |
) | |
return classifier, history | |
def saveLinearClassifier(classifier, model_path, labels): | |
"""Saves a custom classifier on the hard drive. | |
Saves the classifier as a tflite model, as well as the used labels in a .txt. | |
Args: | |
classifier: The custom classifier. | |
model_path: Path the model will be saved at. | |
labels: List of labels used for the classifier. | |
""" | |
# Make folders | |
os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
# Remove activation layer | |
classifier.pop() | |
# Save model as tflite | |
converter = tflite.TFLiteConverter.from_keras_model(classifier) | |
tflite_model = converter.convert() | |
open(model_path, "wb").write(tflite_model) | |
# Save labels | |
with open(model_path.replace(".tflite", "_Labels.txt"), "w") as f: | |
for label in labels: | |
f.write(label + "\n") | |
def predictFilter(lat, lon, week): | |
"""Predicts the probability for each species. | |
Args: | |
lat: The latitude. | |
lon: The longitude. | |
week: The week of the year [1-48]. Use -1 for yearlong. | |
Returns: | |
A list of probabilities for all species. | |
""" | |
global M_INTERPRETER | |
# Does interpreter exist? | |
if M_INTERPRETER == None: | |
loadMetaModel() | |
# Prepare mdata as sample | |
sample = np.expand_dims(np.array([lat, lon, week], dtype="float32"), 0) | |
# Run inference | |
M_INTERPRETER.set_tensor(M_INPUT_LAYER_INDEX, sample) | |
M_INTERPRETER.invoke() | |
return M_INTERPRETER.get_tensor(M_OUTPUT_LAYER_INDEX)[0] | |
def explore(lat: float, lon: float, week: int): | |
"""Predicts the species list. | |
Predicts the species list based on the coordinates and week of year. | |
Args: | |
lat: The latitude. | |
lon: The longitude. | |
week: The week of the year [1-48]. Use -1 for yearlong. | |
Returns: | |
A sorted list of tuples with the score and the species. | |
""" | |
# Make filter prediction | |
l_filter = predictFilter(lat, lon, week) | |
# Apply threshold | |
l_filter = np.where(l_filter >= cfg.LOCATION_FILTER_THRESHOLD, l_filter, 0) | |
# Zip with labels | |
l_filter = list(zip(l_filter, cfg.LABELS)) | |
# Sort by filter value | |
l_filter = sorted(l_filter, key=lambda x: x[0], reverse=True) | |
return l_filter | |
def flat_sigmoid(x, sensitivity=-1): | |
return 1 / (1.0 + np.exp(sensitivity * np.clip(x, -15, 15))) | |
def predict(sample): | |
"""Uses the main net to predict a sample. | |
Args: | |
sample: Audio sample. | |
Returns: | |
The prediction scores for the sample. | |
""" | |
# Has custom classifier? | |
if cfg.CUSTOM_CLASSIFIER != None: | |
return predictWithCustomClassifier(sample) | |
global INTERPRETER | |
# Does interpreter or keras model exist? | |
if INTERPRETER == None and PBMODEL == None: | |
loadModel() | |
if PBMODEL == None: | |
# Reshape input tensor | |
INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
INTERPRETER.allocate_tensors() | |
# Make a prediction (Audio only for now) | |
INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
INTERPRETER.invoke() | |
prediction = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
return prediction | |
else: | |
# Make a prediction (Audio only for now) | |
prediction = PBMODEL.predict(sample) | |
return prediction | |
def predictWithCustomClassifier(sample): | |
"""Uses the custom classifier to make a prediction. | |
Args: | |
sample: Audio sample. | |
Returns: | |
The prediction scores for the sample. | |
""" | |
global C_INTERPRETER | |
# Does interpreter exist? | |
if C_INTERPRETER == None: | |
loadCustomClassifier() | |
# Get embeddings | |
feature_vector = embeddings(sample) | |
# Reshape input tensor | |
C_INTERPRETER.resize_tensor_input(C_INPUT_LAYER_INDEX, [len(feature_vector), *feature_vector[0].shape]) | |
C_INTERPRETER.allocate_tensors() | |
# Make a prediction | |
C_INTERPRETER.set_tensor(C_INPUT_LAYER_INDEX, np.array(feature_vector, dtype="float32")) | |
C_INTERPRETER.invoke() | |
prediction = C_INTERPRETER.get_tensor(C_OUTPUT_LAYER_INDEX) | |
return prediction | |
def embeddings(sample): | |
"""Extracts the embeddings for a sample. | |
Args: | |
sample: Audio samples. | |
Returns: | |
The embeddings. | |
""" | |
global INTERPRETER | |
# Does interpreter exist? | |
if INTERPRETER == None: | |
loadModel(False) | |
# Reshape input tensor | |
INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
INTERPRETER.allocate_tensors() | |
# Extract feature embeddings | |
INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
INTERPRETER.invoke() | |
features = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
return features | |