Spaces:
Runtime error
Runtime error
| """Contains functions to use the BirdNET models. | |
| """ | |
| import os | |
| import warnings | |
| import numpy as np | |
| import config as cfg | |
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| warnings.filterwarnings("ignore") | |
| # Import TFLite from runtime or Tensorflow; | |
| # import Keras if protobuf model; | |
| # NOTE: we have to use TFLite if we want to use | |
| # the metadata model or want to extract embeddings | |
| try: | |
| import tflite_runtime.interpreter as tflite | |
| except ModuleNotFoundError: | |
| from tensorflow import lite as tflite | |
| if not cfg.MODEL_PATH.endswith(".tflite"): | |
| from tensorflow import keras | |
| INTERPRETER: tflite.Interpreter = None | |
| C_INTERPRETER: tflite.Interpreter = None | |
| M_INTERPRETER: tflite.Interpreter = None | |
| PBMODEL = None | |
| def loadModel(class_output=True): | |
| """Initializes the BirdNET Model. | |
| Args: | |
| class_output: Omits the last layer when False. | |
| """ | |
| global PBMODEL | |
| global INTERPRETER | |
| global INPUT_LAYER_INDEX | |
| global OUTPUT_LAYER_INDEX | |
| # Do we have to load the tflite or protobuf model? | |
| if cfg.MODEL_PATH.endswith(".tflite"): | |
| # Load TFLite model and allocate tensors. | |
| INTERPRETER = tflite.Interpreter(model_path=cfg.MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
| INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = INTERPRETER.get_input_details() | |
| output_details = INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| INPUT_LAYER_INDEX = input_details[0]["index"] | |
| # Get classification output or feature embeddings | |
| if class_output: | |
| OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| else: | |
| OUTPUT_LAYER_INDEX = output_details[0]["index"] - 1 | |
| else: | |
| # Load protobuf model | |
| # Note: This will throw a bunch of warnings about custom gradients | |
| # which we will ignore until TF lets us block them | |
| PBMODEL = keras.models.load_model(cfg.MODEL_PATH, compile=False) | |
| def loadCustomClassifier(): | |
| """Loads the custom classifier.""" | |
| global C_INTERPRETER | |
| global C_INPUT_LAYER_INDEX | |
| global C_OUTPUT_LAYER_INDEX | |
| # Load TFLite model and allocate tensors. | |
| C_INTERPRETER = tflite.Interpreter(model_path=cfg.CUSTOM_CLASSIFIER, num_threads=cfg.TFLITE_THREADS) | |
| C_INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = C_INTERPRETER.get_input_details() | |
| output_details = C_INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| C_INPUT_LAYER_INDEX = input_details[0]["index"] | |
| # Get classification output | |
| C_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| def loadMetaModel(): | |
| """Loads the model for species prediction. | |
| Initializes the model used to predict species list, based on coordinates and week of year. | |
| """ | |
| global M_INTERPRETER | |
| global M_INPUT_LAYER_INDEX | |
| global M_OUTPUT_LAYER_INDEX | |
| # Load TFLite model and allocate tensors. | |
| M_INTERPRETER = tflite.Interpreter(model_path=cfg.MDATA_MODEL_PATH, num_threads=cfg.TFLITE_THREADS) | |
| M_INTERPRETER.allocate_tensors() | |
| # Get input and output tensors. | |
| input_details = M_INTERPRETER.get_input_details() | |
| output_details = M_INTERPRETER.get_output_details() | |
| # Get input tensor index | |
| M_INPUT_LAYER_INDEX = input_details[0]["index"] | |
| M_OUTPUT_LAYER_INDEX = output_details[0]["index"] | |
| def buildLinearClassifier(num_labels, input_size, hidden_units=0): | |
| """Builds a classifier. | |
| Args: | |
| num_labels: Output size. | |
| input_size: Size of the input. | |
| hidden_units: If > 0, creates another hidden layer with the given number of units. | |
| Returns: | |
| A new classifier. | |
| """ | |
| # import keras | |
| from tensorflow import keras | |
| # Build a simple one- or two-layer linear classifier | |
| model = keras.Sequential() | |
| # Input layer | |
| model.add(keras.layers.InputLayer(input_shape=(input_size,))) | |
| # Hidden layer | |
| if hidden_units > 0: | |
| model.add(keras.layers.Dense(hidden_units, activation="relu")) | |
| # Classification layer | |
| model.add(keras.layers.Dense(num_labels)) | |
| # Activation layer | |
| model.add(keras.layers.Activation("sigmoid")) | |
| return model | |
| def trainLinearClassifier(classifier, x_train, y_train, epochs, batch_size, learning_rate, on_epoch_end=None): | |
| """Trains a custom classifier. | |
| Trains a new classifier for BirdNET based on the given data. | |
| Args: | |
| classifier: The classifier to be trained. | |
| x_train: Samples. | |
| y_train: Labels. | |
| epochs: Number of epochs to train. | |
| batch_size: Batch size. | |
| learning_rate: The learning rate during training. | |
| on_epoch_end: Optional callback `function(epoch, logs)`. | |
| Returns: | |
| (classifier, history) | |
| """ | |
| # import keras | |
| from tensorflow import keras | |
| class FunctionCallback(keras.callbacks.Callback): | |
| def __init__(self, on_epoch_end=None) -> None: | |
| super().__init__() | |
| self.on_epoch_end_fn = on_epoch_end | |
| def on_epoch_end(self, epoch, logs=None): | |
| if self.on_epoch_end_fn: | |
| self.on_epoch_end_fn(epoch, logs) | |
| # Set random seed | |
| np.random.seed(cfg.RANDOM_SEED) | |
| # Shuffle data | |
| idx = np.arange(x_train.shape[0]) | |
| np.random.shuffle(idx) | |
| x_train = x_train[idx] | |
| y_train = y_train[idx] | |
| # Random val split | |
| x_val = x_train[int(0.8 * x_train.shape[0]) :] | |
| y_val = y_train[int(0.8 * y_train.shape[0]) :] | |
| # Early stopping | |
| callbacks = [ | |
| keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True), | |
| FunctionCallback(on_epoch_end=on_epoch_end), | |
| ] | |
| # Cosine annealing lr schedule | |
| lr_schedule = keras.experimental.CosineDecay(learning_rate, epochs * x_train.shape[0] / batch_size) | |
| # Compile model | |
| classifier.compile( | |
| optimizer=keras.optimizers.Adam(learning_rate=lr_schedule), | |
| loss="binary_crossentropy", | |
| metrics=keras.metrics.Precision(top_k=1, name="prec"), | |
| ) | |
| # Train model | |
| history = classifier.fit( | |
| x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val, y_val), callbacks=callbacks | |
| ) | |
| return classifier, history | |
| def saveLinearClassifier(classifier, model_path, labels): | |
| """Saves a custom classifier on the hard drive. | |
| Saves the classifier as a tflite model, as well as the used labels in a .txt. | |
| Args: | |
| classifier: The custom classifier. | |
| model_path: Path the model will be saved at. | |
| labels: List of labels used for the classifier. | |
| """ | |
| # Make folders | |
| os.makedirs(os.path.dirname(model_path), exist_ok=True) | |
| # Remove activation layer | |
| classifier.pop() | |
| # Save model as tflite | |
| converter = tflite.TFLiteConverter.from_keras_model(classifier) | |
| tflite_model = converter.convert() | |
| open(model_path, "wb").write(tflite_model) | |
| # Save labels | |
| with open(model_path.replace(".tflite", "_Labels.txt"), "w") as f: | |
| for label in labels: | |
| f.write(label + "\n") | |
| def predictFilter(lat, lon, week): | |
| """Predicts the probability for each species. | |
| Args: | |
| lat: The latitude. | |
| lon: The longitude. | |
| week: The week of the year [1-48]. Use -1 for yearlong. | |
| Returns: | |
| A list of probabilities for all species. | |
| """ | |
| global M_INTERPRETER | |
| # Does interpreter exist? | |
| if M_INTERPRETER == None: | |
| loadMetaModel() | |
| # Prepare mdata as sample | |
| sample = np.expand_dims(np.array([lat, lon, week], dtype="float32"), 0) | |
| # Run inference | |
| M_INTERPRETER.set_tensor(M_INPUT_LAYER_INDEX, sample) | |
| M_INTERPRETER.invoke() | |
| return M_INTERPRETER.get_tensor(M_OUTPUT_LAYER_INDEX)[0] | |
| def explore(lat: float, lon: float, week: int): | |
| """Predicts the species list. | |
| Predicts the species list based on the coordinates and week of year. | |
| Args: | |
| lat: The latitude. | |
| lon: The longitude. | |
| week: The week of the year [1-48]. Use -1 for yearlong. | |
| Returns: | |
| A sorted list of tuples with the score and the species. | |
| """ | |
| # Make filter prediction | |
| l_filter = predictFilter(lat, lon, week) | |
| # Apply threshold | |
| l_filter = np.where(l_filter >= cfg.LOCATION_FILTER_THRESHOLD, l_filter, 0) | |
| # Zip with labels | |
| l_filter = list(zip(l_filter, cfg.LABELS)) | |
| # Sort by filter value | |
| l_filter = sorted(l_filter, key=lambda x: x[0], reverse=True) | |
| return l_filter | |
| def flat_sigmoid(x, sensitivity=-1): | |
| return 1 / (1.0 + np.exp(sensitivity * np.clip(x, -15, 15))) | |
| def predict(sample): | |
| """Uses the main net to predict a sample. | |
| Args: | |
| sample: Audio sample. | |
| Returns: | |
| The prediction scores for the sample. | |
| """ | |
| # Has custom classifier? | |
| if cfg.CUSTOM_CLASSIFIER != None: | |
| return predictWithCustomClassifier(sample) | |
| global INTERPRETER | |
| # Does interpreter or keras model exist? | |
| if INTERPRETER == None and PBMODEL == None: | |
| loadModel() | |
| if PBMODEL == None: | |
| # Reshape input tensor | |
| INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
| INTERPRETER.allocate_tensors() | |
| # Make a prediction (Audio only for now) | |
| INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
| INTERPRETER.invoke() | |
| prediction = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
| return prediction | |
| else: | |
| # Make a prediction (Audio only for now) | |
| prediction = PBMODEL.predict(sample) | |
| return prediction | |
| def predictWithCustomClassifier(sample): | |
| """Uses the custom classifier to make a prediction. | |
| Args: | |
| sample: Audio sample. | |
| Returns: | |
| The prediction scores for the sample. | |
| """ | |
| global C_INTERPRETER | |
| # Does interpreter exist? | |
| if C_INTERPRETER == None: | |
| loadCustomClassifier() | |
| # Get embeddings | |
| feature_vector = embeddings(sample) | |
| # Reshape input tensor | |
| C_INTERPRETER.resize_tensor_input(C_INPUT_LAYER_INDEX, [len(feature_vector), *feature_vector[0].shape]) | |
| C_INTERPRETER.allocate_tensors() | |
| # Make a prediction | |
| C_INTERPRETER.set_tensor(C_INPUT_LAYER_INDEX, np.array(feature_vector, dtype="float32")) | |
| C_INTERPRETER.invoke() | |
| prediction = C_INTERPRETER.get_tensor(C_OUTPUT_LAYER_INDEX) | |
| return prediction | |
| def embeddings(sample): | |
| """Extracts the embeddings for a sample. | |
| Args: | |
| sample: Audio samples. | |
| Returns: | |
| The embeddings. | |
| """ | |
| global INTERPRETER | |
| # Does interpreter exist? | |
| if INTERPRETER == None: | |
| loadModel(False) | |
| # Reshape input tensor | |
| INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape]) | |
| INTERPRETER.allocate_tensors() | |
| # Extract feature embeddings | |
| INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32")) | |
| INTERPRETER.invoke() | |
| features = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX) | |
| return features | |