#https://products.aspose.app/audio/voice-recorder/wav from flask import Flask, request, jsonify from datetime import datetime import whisper import time import pytz import os app = Flask(__name__) @app.route("/") def hello(): return "Semabox, listens to you!" # Load the Whisper model print("Loading Whisper model...\n", flush=True) model = whisper.load_model("tiny") print("\nWhisper model loaded.\n", flush=True) # Get time of request def get_time(): nairobi_timezone = pytz.timezone('Africa/Nairobi') current_time_nairobi = datetime.now(nairobi_timezone) curr_day = current_time_nairobi.strftime('%A') curr_date = current_time_nairobi.strftime('%Y-%m-%d') curr_time = current_time_nairobi.strftime('%H:%M:%S') full_date = f"{curr_day} | {curr_date} | {curr_time}" return full_date, curr_time # Convert file size from bytes to KB or MB def convert_size(bytes): if bytes < 1024: return f"{bytes} bytes" elif bytes < 1024**2: return f"{bytes / 1024:.2f} KB" else: return f"{bytes / 1024**2:.2f} MB" def transcribe(audio_path): #print(f" Transcribing audio from: {audio_path}", flush=True) # Load audio and pad/trim it to fit 30 seconds #print(" Loading and processing audio...", flush=True) audio = whisper.load_audio(audio_path) audio = whisper.pad_or_trim(audio) # Make log-Mel spectrogram and move to the same device as the model #print(" Creating log-Mel spectrogram...", flush=True) mel = whisper.log_mel_spectrogram(audio).to(model.device) # Detect the spoken language #print(" Detecting language...", flush=True) _, probs = model.detect_language(mel) language = max(probs, key=probs.get) #print(f" Detected language: {language}", flush=True) # Decode the audio #print(" Decoding audio...", flush=True) options = whisper.DecodingOptions(fp16=False) result = whisper.decode(model, mel, options) print(" Transcription complete.", flush=True) return result.text, language, result @app.route('/transcribe', methods=['POST']) def transcribe_audio(): # Record the time when the request was received request_received_time, _ = get_time() print(f"Query:- {request_received_time}", flush=True) if 'audio' not in request.files: print("Error: No audio file provided", flush=True) return jsonify({"error": "No audio file provided"}), 400 audio_file = request.files['audio'] audio_file_size_bytes = len(audio_file.read()) # Calculate the size of the file in bytes audio_file.seek(0) # Reset the file pointer after reading audio_file_size = convert_size(audio_file_size_bytes) # Convert file size to KB or MB # Save the uploaded audio file audio_path = os.path.join("temp_audio", audio_file.filename) os.makedirs("temp_audio", exist_ok=True) audio_file.save(audio_path) print(f" Audio file saved to: {audio_path} (Size: {audio_file_size})", flush=True) # Record the time before starting transcription transcription_start_time = time.time() # Transcribe the audio try: transcription, language, srt = transcribe(audio_path) except Exception as e: print(f" Error during transcription: {str(e)}", flush=True) return jsonify({"error": f"An error occurred: {str(e)}"}), 500 # Calculate the time taken for transcription transcription_end_time = time.time() transcription_duration = round(transcription_end_time - transcription_start_time, 2) # Clean up the saved file os.remove(audio_path) print(f" Audio file removed from: {audio_path}\n", flush=True) # Record the time when the response is being sent response_sent_time, _ = get_time() # Return the transcription, detected language, and timing information #print(f" Transcription: {transcription}, Language: {language}, Processing Time: {transcription_duration}\n", flush=True) print(f" \033[92mTranscription: {transcription}, Language: {language}, Processing Time: {transcription_duration}\033[0m\n", flush=True) #print(srt, flush=True) return jsonify({ "transcription": transcription, "language": language, "request_received_time": request_received_time, "transcription_duration_seconds": transcription_duration, "response_sent_time": response_sent_time, "audio_file_size": audio_file_size }), 200 @app.route('/healthcheck', methods=['GET']) def healthcheck(): print("Received request at /healthcheck\n", flush=True) return jsonify({"status": "API is running"}), 200 if __name__ == '__main__': print("Starting Flask app...\n", flush=True) app.run(host="0.0.0.0", port=5000)