Spaces:
Paused
Paused
| from flask import Flask, request, jsonify | |
| import torch | |
| import shutil | |
| import os | |
| import sys | |
| from argparse import ArgumentParser | |
| from time import strftime | |
| from argparse import Namespace | |
| from src.utils.preprocess import CropAndExtract | |
| from src.test_audio2coeff import Audio2Coeff | |
| from src.facerender.animate import AnimateFromCoeff | |
| from src.generate_batch import get_data | |
| from src.generate_facerender_batch import get_facerender_data | |
| # from src.utils.init_path import init_path | |
| import tempfile | |
| from openai import OpenAI | |
| import threading | |
| import elevenlabs | |
| from elevenlabs import set_api_key, generate, play, clone | |
| from flask_cors import CORS, cross_origin | |
| # from flask_swagger_ui import get_swaggerui_blueprint | |
| import uuid | |
| import time | |
| from PIL import Image | |
| import moviepy.editor as mp | |
| import requests | |
| import json | |
| import pickle | |
| # from videoretalking import inference_function | |
| # import base64 | |
| # import gfpgan_enhancer | |
| class AnimationConfig: | |
| def __init__(self, driven_audio_path, source_image_path, result_folder,pose_style,expression_scale,enhancer,still,preprocess,ref_pose_video_path, image_hardcoded): | |
| self.driven_audio = driven_audio_path | |
| self.source_image = source_image_path | |
| self.ref_eyeblink = None | |
| self.ref_pose = ref_pose_video_path | |
| self.checkpoint_dir = './checkpoints' | |
| self.result_dir = result_folder | |
| self.pose_style = pose_style | |
| self.batch_size = 2 | |
| self.expression_scale = expression_scale | |
| self.input_yaw = None | |
| self.input_pitch = None | |
| self.input_roll = None | |
| self.enhancer = enhancer | |
| self.background_enhancer = None | |
| self.cpu = False | |
| self.face3dvis = False | |
| self.still = still | |
| self.preprocess = preprocess | |
| self.verbose = False | |
| self.old_version = False | |
| self.net_recon = 'resnet50' | |
| self.init_path = None | |
| self.use_last_fc = False | |
| self.bfm_folder = './checkpoints/BFM_Fitting/' | |
| self.bfm_model = 'BFM_model_front.mat' | |
| self.focal = 1015. | |
| self.center = 112. | |
| self.camera_d = 10. | |
| self.z_near = 5. | |
| self.z_far = 15. | |
| self.device = 'cpu' | |
| self.image_hardcoded = image_hardcoded | |
| app = Flask(__name__) | |
| CORS(app) | |
| TEMP_DIR = None | |
| start_time = None | |
| app.config['temp_response'] = None | |
| app.config['generation_thread'] = None | |
| app.config['text_prompt'] = None | |
| app.config['final_video_path'] = None | |
| app.config['final_video_duration'] = None | |
| def main(args): | |
| pic_path = args.source_image | |
| audio_path = args.driven_audio | |
| save_dir = args.result_dir | |
| pose_style = args.pose_style | |
| device = args.device | |
| batch_size = args.batch_size | |
| input_yaw_list = args.input_yaw | |
| input_pitch_list = args.input_pitch | |
| input_roll_list = args.input_roll | |
| ref_eyeblink = args.ref_eyeblink | |
| ref_pose = args.ref_pose | |
| preprocess = args.preprocess | |
| image_hardcoded = args.image_hardcoded | |
| dir_path = os.path.dirname(os.path.realpath(__file__)) | |
| current_root_path = dir_path | |
| print('current_root_path ',current_root_path) | |
| # sadtalker_paths = init_path(args.checkpoint_dir, os.path.join(current_root_path, 'src/config'), args.size, args.old_version, args.preprocess) | |
| path_of_lm_croper = os.path.join(current_root_path, args.checkpoint_dir, 'shape_predictor_68_face_landmarks.dat') | |
| path_of_net_recon_model = os.path.join(current_root_path, args.checkpoint_dir, 'epoch_20.pth') | |
| dir_of_BFM_fitting = os.path.join(current_root_path, args.checkpoint_dir, 'BFM_Fitting') | |
| wav2lip_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'wav2lip.pth') | |
| audio2pose_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2pose_00140-model.pth') | |
| audio2pose_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2pose.yaml') | |
| audio2exp_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2exp_00300-model.pth') | |
| audio2exp_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2exp.yaml') | |
| free_view_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'facevid2vid_00189-model.pth.tar') | |
| if preprocess == 'full': | |
| mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00109-model.pth.tar') | |
| facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender_still.yaml') | |
| else: | |
| mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00229-model.pth.tar') | |
| facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender.yaml') | |
| # preprocess_model = CropAndExtract(sadtalker_paths, device) | |
| #init model | |
| print(path_of_net_recon_model) | |
| preprocess_model = CropAndExtract(path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting, device) | |
| # audio_to_coeff = Audio2Coeff(sadtalker_paths, device) | |
| audio_to_coeff = Audio2Coeff(audio2pose_checkpoint, audio2pose_yaml_path, | |
| audio2exp_checkpoint, audio2exp_yaml_path, | |
| wav2lip_checkpoint, device) | |
| # animate_from_coeff = AnimateFromCoeff(sadtalker_paths, device) | |
| animate_from_coeff = AnimateFromCoeff(free_view_checkpoint, mapping_checkpoint, | |
| facerender_yaml_path, device) | |
| first_frame_dir = os.path.join(save_dir, 'first_frame_dir') | |
| os.makedirs(first_frame_dir, exist_ok=True) | |
| # first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess,\ | |
| # source_image_flag=True, pic_size=args.size) | |
| fixed_temp_dir = "/tmp/preprocess_data" | |
| os.makedirs(fixed_temp_dir, exist_ok=True) | |
| preprocessed_data_path = os.path.join(fixed_temp_dir, "preprocessed_data.pkl") | |
| if os.path.exists(preprocessed_data_path) and image_hardcoded == "yes": | |
| print("Loading preprocessed data...") | |
| with open(preprocessed_data_path, "rb") as f: | |
| preprocessed_data = pickle.load(f) | |
| first_coeff_new_path = preprocessed_data["first_coeff_path"] | |
| crop_pic_new_path = preprocessed_data["crop_pic_path"] | |
| crop_info_path = preprocessed_data["crop_info_path"] | |
| with open(crop_info_path, "rb") as f: | |
| crop_info = pickle.load(f) | |
| print(f"Loaded existing preprocessed data from: {preprocessed_data_path}") | |
| else: | |
| print("Running preprocessing...") | |
| first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess, source_image_flag=True) | |
| first_coeff_new_path = os.path.join(fixed_temp_dir, os.path.basename(first_coeff_path)) | |
| crop_pic_new_path = os.path.join(fixed_temp_dir, os.path.basename(crop_pic_path)) | |
| crop_info_new_path = os.path.join(fixed_temp_dir, "crop_info.pkl") | |
| shutil.move(first_coeff_path, first_coeff_new_path) | |
| shutil.move(crop_pic_path, crop_pic_new_path) | |
| with open(crop_info_new_path, "wb") as f: | |
| pickle.dump(crop_info, f) | |
| preprocessed_data = {"first_coeff_path": first_coeff_new_path, | |
| "crop_pic_path": crop_pic_new_path, | |
| "crop_info_path": crop_info_new_path} | |
| with open(preprocessed_data_path, "wb") as f: | |
| pickle.dump(preprocessed_data, f) | |
| print(f"Preprocessed data saved to: {preprocessed_data_path}") | |
| print('first_coeff_path ',first_coeff_new_path) | |
| print('crop_pic_path ',crop_pic_new_path) | |
| print('crop_info ',crop_info) | |
| if first_coeff_new_path is None: | |
| print("Can't get the coeffs of the input") | |
| return | |
| if ref_eyeblink is not None: | |
| ref_eyeblink_videoname = os.path.splitext(os.path.split(ref_eyeblink)[-1])[0] | |
| ref_eyeblink_frame_dir = os.path.join(save_dir, ref_eyeblink_videoname) | |
| os.makedirs(ref_eyeblink_frame_dir, exist_ok=True) | |
| # ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir, args.preprocess, source_image_flag=False) | |
| ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir) | |
| else: | |
| ref_eyeblink_coeff_path=None | |
| print('ref_eyeblink_coeff_path',ref_eyeblink_coeff_path) | |
| if ref_pose is not None: | |
| if ref_pose == ref_eyeblink: | |
| ref_pose_coeff_path = ref_eyeblink_coeff_path | |
| else: | |
| ref_pose_videoname = os.path.splitext(os.path.split(ref_pose)[-1])[0] | |
| ref_pose_frame_dir = os.path.join(save_dir, ref_pose_videoname) | |
| os.makedirs(ref_pose_frame_dir, exist_ok=True) | |
| # ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir, args.preprocess, source_image_flag=False) | |
| ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir) | |
| else: | |
| ref_pose_coeff_path=None | |
| print('ref_eyeblink_coeff_path',ref_pose_coeff_path) | |
| batch = get_data(first_coeff_new_path, audio_path, device, ref_eyeblink_coeff_path, still=args.still) | |
| coeff_path = audio_to_coeff.generate(batch, save_dir, pose_style, ref_pose_coeff_path) | |
| if args.face3dvis: | |
| from src.face3d.visualize import gen_composed_video | |
| gen_composed_video(args, device, first_coeff_new_path, coeff_path, audio_path, os.path.join(save_dir, '3dface.mp4')) | |
| # data = get_facerender_data(coeff_path, crop_pic_path, first_coeff_path, audio_path, | |
| # batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
| # expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess, size=args.size) | |
| data = get_facerender_data(coeff_path, crop_pic_new_path, first_coeff_new_path, audio_path, | |
| batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
| expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess) | |
| # result, base64_video,temp_file_path= animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
| # enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess, img_size=args.size) | |
| result, base64_video,temp_file_path,new_audio_path = animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
| enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess) | |
| # face_path = temp_file_path | |
| # audio_path = new_audio_path | |
| # temp_file = tempfile.NamedTemporaryFile(delete=False, dir=TEMP_DIR.name, suffix='.mp4') | |
| # video_lipsync_file_path = temp_file.name | |
| # output_path = video_lipsync_file_path | |
| # # Call the function | |
| # inference_function.video_lipsync_correctness( | |
| # face=face_path, | |
| # audio_path=audio_path, | |
| # face3d_net_path = path_of_net_recon_model, | |
| # outfile=output_path, | |
| # tmp_dir="temp", | |
| # crop=[0, -1, 0, -1], | |
| # re_preprocess=True, # Set to True if you want to reprocess; False otherwise | |
| # exp_img="neutral", # Can be 'smile', 'neutral', or path to an expression image | |
| # one_shot=False, | |
| # up_face="original", # Options: 'original', 'sad', 'angry', 'surprise' | |
| # LNet_batch_size=16, | |
| # without_rl1=False | |
| # ) | |
| # print('The video with lip sync is generated') | |
| # print("GFPGAN Activated") | |
| # gfpgan_enhancer.process_video_with_gfpgan(output_path, output_path) | |
| # audio_clip = mp.AudioFileClip(new_audio_path) | |
| # video_clip = mp.VideoFileClip(output_path) | |
| # # Combine audio and video | |
| # final_clip = video_clip.set_audio(audio_clip) | |
| # temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', dir=TEMP_DIR.name, delete=False) | |
| # temp_file.close() | |
| # final_video_path = temp_file.name | |
| # final_clip.write_videofile(final_video_path) | |
| # with open(final_video_path, 'rb') as f: | |
| # video_content = f.read() | |
| # base64_lipsync_video = base64.b64encode(video_content).decode('utf-8') | |
| video_clip = mp.VideoFileClip(temp_file_path) | |
| duration = video_clip.duration | |
| app.config['temp_response'] = base64_video | |
| app.config['final_video_path'] = temp_file_path | |
| app.config['final_video_duration'] = duration | |
| return base64_video, temp_file_path, duration | |
| # shutil.move(result, save_dir+'.mp4') | |
| if not args.verbose: | |
| shutil.rmtree(save_dir) | |
| def create_temp_dir(): | |
| return tempfile.TemporaryDirectory() | |
| def save_uploaded_file(file, filename,TEMP_DIR): | |
| unique_filename = str(uuid.uuid4()) + "_" + filename | |
| file_path = os.path.join(TEMP_DIR.name, unique_filename) | |
| file.save(file_path) | |
| return file_path | |
| client = OpenAI(api_key="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") | |
| def translate_text(text_prompt, target_language): | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": "You are a helpful language translator assistant."}, | |
| {"role": "user", "content": f"Translate completely without hallucination, end to end and the ouput should just be the translation of the text prompt and nothing else, and give the following text to {target_language} language and the text is: {text_prompt}"}, | |
| ], | |
| max_tokens = len(text_prompt) + 200 # Use the length of the input text | |
| # temperature=0.3, | |
| # stop=["Translate:", "Text:"] | |
| ) | |
| return response | |
| def openai_chat_avatar(text_prompt): | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "system", "content": "Answer in Portuguese language always using the minimum words you can ever use."}, | |
| {"role": "user", "content": f"Hi! I need help with something. Can you assist me with the following: {text_prompt}"}, | |
| ], | |
| max_tokens = len(text_prompt) + 300 # Use the length of the input text | |
| # temperature=0.3, | |
| # stop=["Translate:", "Text:"] | |
| ) | |
| return response | |
| def ryzedb_chat_avatar(question): | |
| url = "https://inference.dev.ryzeai.ai/chat/stream" | |
| question = question + ". Summarize and Answer using the minimum words you can ever use." | |
| payload = json.dumps({ | |
| "input": { | |
| "chat_history": [], | |
| "app_id": "af6b2bc719a14478adcff1d71c19dc00", | |
| "question": question | |
| }, | |
| "config": {} | |
| }) | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| try: | |
| # Send the POST request | |
| response = requests.request("POST", url, headers=headers, data=payload) | |
| # Check for successful request | |
| response.raise_for_status() | |
| # Return the response JSON | |
| return response.text | |
| except requests.exceptions.RequestException as e: | |
| print(f"An error occurred: {e}") | |
| return None | |
| def custom_cleanup(temp_dir, exclude_dir): | |
| # Iterate over the files and directories in TEMP_DIR | |
| for filename in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, filename) | |
| # Skip the directory we want to exclude | |
| if file_path != exclude_dir: | |
| try: | |
| if os.path.isdir(file_path): | |
| shutil.rmtree(file_path) | |
| else: | |
| os.remove(file_path) | |
| print(f"Deleted: {file_path}") | |
| except Exception as e: | |
| print(f"Failed to delete {file_path}. Reason: {e}") | |
| def generate_video(): | |
| global start_time | |
| start_time = time.time() | |
| global TEMP_DIR | |
| TEMP_DIR = create_temp_dir() | |
| print('request:',request.method) | |
| try: | |
| if request.method == 'POST': | |
| # source_image = request.files['source_image'] | |
| image_path = '/home/user/app/images/screen.jpg' | |
| source_image = Image.open(image_path) | |
| text_prompt = request.form['text_prompt'] | |
| print('Input text prompt: ',text_prompt) | |
| text_prompt = text_prompt.strip() | |
| if not text_prompt: | |
| return jsonify({'error': 'Input text prompt cannot be blank'}), 400 | |
| voice_cloning = request.form.get('voice_cloning', 'no') | |
| image_hardcoded = request.form.get('image_hardcoded', 'yes') | |
| chat_model_used = request.form.get('chat_model_used', 'openai') | |
| target_language = request.form.get('target_language', 'original_text') | |
| print('target_language',target_language) | |
| pose_style = int(request.form.get('pose_style', 1)) | |
| expression_scale = float(request.form.get('expression_scale', 1)) | |
| enhancer = request.form.get('enhancer', None) | |
| voice_gender = request.form.get('voice_gender', 'male') | |
| still_str = request.form.get('still', 'False') | |
| still = still_str.lower() == 'false' | |
| print('still', still) | |
| preprocess = request.form.get('preprocess', 'crop') | |
| print('preprocess selected: ',preprocess) | |
| ref_pose_video = request.files.get('ref_pose', None) | |
| # if target_language != 'original_text': | |
| # response = translate_text(text_prompt, target_language) | |
| # # response = await translate_text_async(text_prompt, target_language) | |
| # text_prompt = response.choices[0].message.content.strip() | |
| if chat_model_used == 'ryzedb': | |
| response = ryzedb_chat_avatar(text_prompt) | |
| events = response.split('\r\n\r\n') | |
| content = None | |
| for event in events: | |
| # Split each event block by "\r\n" to get the lines | |
| lines = event.split('\r\n') | |
| if len(lines) > 1 and lines[0] == 'event: data': | |
| # Extract the JSON part from the second line and parse it | |
| json_data = lines[1].replace('data: ', '') | |
| try: | |
| data = json.loads(json_data) | |
| text_prompt = data.get('content') | |
| app.config['text_prompt'] = text_prompt | |
| print('Final output text prompt using ryzedb: ',text_prompt) | |
| break # Exit the loop once content is found | |
| except json.JSONDecodeError: | |
| continue | |
| else: | |
| response = openai_chat_avatar(text_prompt) | |
| text_prompt = response.choices[0].message.content.strip() | |
| app.config['text_prompt'] = text_prompt | |
| print('Final output text prompt using openai: ',text_prompt) | |
| source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR) | |
| print(source_image_path) | |
| # driven_audio_path = await voice_cloning_async(voice_cloning, voice_gender, text_prompt, user_voice) | |
| if voice_cloning == 'no': | |
| if voice_gender == 'male': | |
| voice = 'echo' | |
| print('Entering Audio creation using elevenlabs') | |
| set_api_key("92e149985ea2732b4359c74346c3daee") | |
| audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| for chunk in audio: | |
| temp_file.write(chunk) | |
| driven_audio_path = temp_file.name | |
| print('driven_audio_path',driven_audio_path) | |
| print('Audio file saved using elevenlabs') | |
| else: | |
| voice = 'nova' | |
| print('Entering Audio creation using whisper') | |
| response = client.audio.speech.create(model="tts-1-hd", | |
| voice=voice, | |
| input = text_prompt) | |
| print('Audio created using whisper') | |
| with tempfile.NamedTemporaryFile(suffix=".wav", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| driven_audio_path = temp_file.name | |
| response.write_to_file(driven_audio_path) | |
| print('Audio file saved using whisper') | |
| elif voice_cloning == 'yes': | |
| # user_voice = request.files['user_voice'] | |
| user_voice = '/home/user/app/images/Recording.m4a' | |
| with tempfile.NamedTemporaryFile(suffix=".wav", prefix="user_voice_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| with open(user_voice, 'rb') as source_file: | |
| file_contents = source_file.read() | |
| temp_file.write(file_contents) | |
| temp_file.flush() | |
| user_voice_path = temp_file.name | |
| # user_voice.save(user_voice_path) | |
| print('user_voice_path',user_voice_path) | |
| set_api_key("92e149985ea2732b4359c74346c3daee") | |
| voice = clone(name = "User Cloned Voice", | |
| files = [user_voice_path] ) | |
| audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| for chunk in audio: | |
| temp_file.write(chunk) | |
| driven_audio_path = temp_file.name | |
| print('driven_audio_path',driven_audio_path) | |
| # elevenlabs.save(audio, driven_audio_path) | |
| save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name) | |
| result_folder = os.path.join(save_dir, "results") | |
| os.makedirs(result_folder, exist_ok=True) | |
| ref_pose_video_path = None | |
| if ref_pose_video: | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="ref_pose_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
| ref_pose_video_path = temp_file.name | |
| ref_pose_video.save(ref_pose_video_path) | |
| print('ref_pose_video_path',ref_pose_video_path) | |
| except Exception as e: | |
| app.logger.error(f"An error occurred: {e}") | |
| return "An error occurred", 500 | |
| # Example of using the class with some hypothetical paths | |
| args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded) | |
| if torch.cuda.is_available() and not args.cpu: | |
| args.device = "cuda" | |
| else: | |
| args.device = "cpu" | |
| generation_thread = threading.Thread(target=main, args=(args,)) | |
| app.config['generation_thread'] = generation_thread | |
| generation_thread.start() | |
| response_data = {"message": "Video generation started", | |
| "process_id": generation_thread.ident} | |
| return jsonify(response_data) | |
| def check_generation_status(): | |
| global TEMP_DIR | |
| global start_time | |
| response = {"base64_video": "","text_prompt":"", "status": ""} | |
| process_id = request.args.get('process_id', None) | |
| # process_id is required to check the status for that specific process | |
| if process_id: | |
| generation_thread = app.config.get('generation_thread') | |
| if generation_thread and generation_thread.ident == int(process_id) and generation_thread.is_alive(): | |
| return jsonify({"status": "in_progress"}), 200 | |
| elif app.config.get('temp_response'): | |
| # app.config['temp_response']['status'] = 'completed' | |
| final_response = app.config['temp_response'] | |
| response["base64_video"] = final_response | |
| response["text_prompt"] = app.config.get('text_prompt') | |
| response["duration"] = app.config.get('final_video_duration') | |
| response["status"] = "completed" | |
| final_video_path = app.config['final_video_path'] | |
| print('final_video_path',final_video_path) | |
| if final_video_path and os.path.exists(final_video_path): | |
| os.remove(final_video_path) | |
| print("Deleted video file:", final_video_path) | |
| # TEMP_DIR.cleanup() | |
| preprocess_dir = os.path.join("/tmp", "preprocess_data") | |
| custom_cleanup(TEMP_DIR.name, preprocess_dir) | |
| print("Temporary files cleaned up, but preprocess_data is retained.") | |
| end_time = time.time() | |
| total_time = round(end_time - start_time, 2) | |
| print("Total time taken for execution:", total_time, " seconds") | |
| response["time_taken"] = total_time | |
| return jsonify(response) | |
| return jsonify({"error":"No process id provided"}) | |
| def health_status(): | |
| response = {"online": "true"} | |
| return jsonify(response) | |
| if __name__ == '__main__': | |
| app.run(debug=True) |