Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify | |
import torch | |
import shutil | |
import os | |
import sys | |
from argparse import ArgumentParser | |
from time import strftime | |
from argparse import Namespace | |
from src.utils.preprocess import CropAndExtract | |
from src.test_audio2coeff import Audio2Coeff | |
from src.facerender.animate import AnimateFromCoeff | |
from src.generate_batch import get_data | |
from src.generate_facerender_batch import get_facerender_data | |
# from src.utils.init_path import init_path | |
import tempfile | |
from openai import OpenAI | |
import threading | |
import elevenlabs | |
from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings | |
from flask_cors import CORS, cross_origin | |
# from flask_swagger_ui import get_swaggerui_blueprint | |
import uuid | |
import time | |
from PIL import Image | |
import moviepy.editor as mp | |
import requests | |
import json | |
import pickle | |
# from videoretalking import inference_function | |
# import base64 | |
# import gfpgan_enhancer | |
class AnimationConfig: | |
def __init__(self, driven_audio_path, source_image_path, result_folder,pose_style,expression_scale,enhancer,still,preprocess,ref_pose_video_path, image_hardcoded): | |
self.driven_audio = driven_audio_path | |
self.source_image = source_image_path | |
self.ref_eyeblink = None | |
self.ref_pose = ref_pose_video_path | |
self.checkpoint_dir = './checkpoints' | |
self.result_dir = result_folder | |
self.pose_style = pose_style | |
self.batch_size = 2 | |
self.expression_scale = expression_scale | |
self.input_yaw = None | |
self.input_pitch = None | |
self.input_roll = None | |
self.enhancer = enhancer | |
self.background_enhancer = None | |
self.cpu = False | |
self.face3dvis = False | |
self.still = still | |
self.preprocess = preprocess | |
self.verbose = False | |
self.old_version = False | |
self.net_recon = 'resnet50' | |
self.init_path = None | |
self.use_last_fc = False | |
self.bfm_folder = './checkpoints/BFM_Fitting/' | |
self.bfm_model = 'BFM_model_front.mat' | |
self.focal = 1015. | |
self.center = 112. | |
self.camera_d = 10. | |
self.z_near = 5. | |
self.z_far = 15. | |
self.device = 'cuda' | |
self.image_hardcoded = image_hardcoded | |
app = Flask(__name__) | |
CORS(app) | |
TEMP_DIR = None | |
start_time = None | |
app.config['temp_response'] = None | |
app.config['generation_thread'] = None | |
app.config['text_prompt'] = None | |
app.config['final_video_path'] = None | |
app.config['final_video_duration'] = None | |
def main(args): | |
pic_path = args.source_image | |
audio_path = args.driven_audio | |
save_dir = args.result_dir | |
pose_style = args.pose_style | |
device = args.device | |
batch_size = args.batch_size | |
input_yaw_list = args.input_yaw | |
input_pitch_list = args.input_pitch | |
input_roll_list = args.input_roll | |
ref_eyeblink = args.ref_eyeblink | |
ref_pose = args.ref_pose | |
preprocess = args.preprocess | |
image_hardcoded = args.image_hardcoded | |
dir_path = os.path.dirname(os.path.realpath(__file__)) | |
current_root_path = dir_path | |
print('current_root_path ',current_root_path) | |
# sadtalker_paths = init_path(args.checkpoint_dir, os.path.join(current_root_path, 'src/config'), args.size, args.old_version, args.preprocess) | |
path_of_lm_croper = os.path.join(current_root_path, args.checkpoint_dir, 'shape_predictor_68_face_landmarks.dat') | |
path_of_net_recon_model = os.path.join(current_root_path, args.checkpoint_dir, 'epoch_20.pth') | |
dir_of_BFM_fitting = os.path.join(current_root_path, args.checkpoint_dir, 'BFM_Fitting') | |
wav2lip_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'wav2lip.pth') | |
audio2pose_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2pose_00140-model.pth') | |
audio2pose_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2pose.yaml') | |
audio2exp_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'auido2exp_00300-model.pth') | |
audio2exp_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2exp.yaml') | |
free_view_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'facevid2vid_00189-model.pth.tar') | |
if preprocess == 'full': | |
mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00109-model.pth.tar') | |
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender_still.yaml') | |
else: | |
mapping_checkpoint = os.path.join(current_root_path, args.checkpoint_dir, 'mapping_00229-model.pth.tar') | |
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender.yaml') | |
# preprocess_model = CropAndExtract(sadtalker_paths, device) | |
#init model | |
print(path_of_net_recon_model) | |
preprocess_model = CropAndExtract(path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting, device) | |
# audio_to_coeff = Audio2Coeff(sadtalker_paths, device) | |
audio_to_coeff = Audio2Coeff(audio2pose_checkpoint, audio2pose_yaml_path, | |
audio2exp_checkpoint, audio2exp_yaml_path, | |
wav2lip_checkpoint, device) | |
# animate_from_coeff = AnimateFromCoeff(sadtalker_paths, device) | |
animate_from_coeff = AnimateFromCoeff(free_view_checkpoint, mapping_checkpoint, | |
facerender_yaml_path, device) | |
first_frame_dir = os.path.join(save_dir, 'first_frame_dir') | |
os.makedirs(first_frame_dir, exist_ok=True) | |
# first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess,\ | |
# source_image_flag=True, pic_size=args.size) | |
fixed_temp_dir = "/tmp/preprocess_data" | |
os.makedirs(fixed_temp_dir, exist_ok=True) | |
preprocessed_data_path = os.path.join(fixed_temp_dir, "preprocessed_data.pkl") | |
if os.path.exists(preprocessed_data_path) and image_hardcoded == "yes": | |
print("Loading preprocessed data...") | |
with open(preprocessed_data_path, "rb") as f: | |
preprocessed_data = pickle.load(f) | |
first_coeff_new_path = preprocessed_data["first_coeff_path"] | |
crop_pic_new_path = preprocessed_data["crop_pic_path"] | |
crop_info_path = preprocessed_data["crop_info_path"] | |
with open(crop_info_path, "rb") as f: | |
crop_info = pickle.load(f) | |
print(f"Loaded existing preprocessed data from: {preprocessed_data_path}") | |
else: | |
print("Running preprocessing...") | |
first_coeff_path, crop_pic_path, crop_info = preprocess_model.generate(pic_path, first_frame_dir, args.preprocess, source_image_flag=True) | |
first_coeff_new_path = os.path.join(fixed_temp_dir, os.path.basename(first_coeff_path)) | |
crop_pic_new_path = os.path.join(fixed_temp_dir, os.path.basename(crop_pic_path)) | |
crop_info_new_path = os.path.join(fixed_temp_dir, "crop_info.pkl") | |
shutil.move(first_coeff_path, first_coeff_new_path) | |
shutil.move(crop_pic_path, crop_pic_new_path) | |
with open(crop_info_new_path, "wb") as f: | |
pickle.dump(crop_info, f) | |
preprocessed_data = {"first_coeff_path": first_coeff_new_path, | |
"crop_pic_path": crop_pic_new_path, | |
"crop_info_path": crop_info_new_path} | |
with open(preprocessed_data_path, "wb") as f: | |
pickle.dump(preprocessed_data, f) | |
print(f"Preprocessed data saved to: {preprocessed_data_path}") | |
print('first_coeff_path ',first_coeff_new_path) | |
print('crop_pic_path ',crop_pic_new_path) | |
print('crop_info ',crop_info) | |
if first_coeff_new_path is None: | |
print("Can't get the coeffs of the input") | |
return | |
if ref_eyeblink is not None: | |
ref_eyeblink_videoname = os.path.splitext(os.path.split(ref_eyeblink)[-1])[0] | |
ref_eyeblink_frame_dir = os.path.join(save_dir, ref_eyeblink_videoname) | |
os.makedirs(ref_eyeblink_frame_dir, exist_ok=True) | |
# ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir, args.preprocess, source_image_flag=False) | |
ref_eyeblink_coeff_path, _, _ = preprocess_model.generate(ref_eyeblink, ref_eyeblink_frame_dir) | |
else: | |
ref_eyeblink_coeff_path=None | |
print('ref_eyeblink_coeff_path',ref_eyeblink_coeff_path) | |
if ref_pose is not None: | |
if ref_pose == ref_eyeblink: | |
ref_pose_coeff_path = ref_eyeblink_coeff_path | |
else: | |
ref_pose_videoname = os.path.splitext(os.path.split(ref_pose)[-1])[0] | |
ref_pose_frame_dir = os.path.join(save_dir, ref_pose_videoname) | |
os.makedirs(ref_pose_frame_dir, exist_ok=True) | |
# ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir, args.preprocess, source_image_flag=False) | |
ref_pose_coeff_path, _, _ = preprocess_model.generate(ref_pose, ref_pose_frame_dir) | |
else: | |
ref_pose_coeff_path=None | |
print('ref_eyeblink_coeff_path',ref_pose_coeff_path) | |
batch = get_data(first_coeff_new_path, audio_path, device, ref_eyeblink_coeff_path, still=args.still) | |
coeff_path = audio_to_coeff.generate(batch, save_dir, pose_style, ref_pose_coeff_path) | |
if args.face3dvis: | |
from src.face3d.visualize import gen_composed_video | |
gen_composed_video(args, device, first_coeff_new_path, coeff_path, audio_path, os.path.join(save_dir, '3dface.mp4')) | |
# data = get_facerender_data(coeff_path, crop_pic_path, first_coeff_path, audio_path, | |
# batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
# expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess, size=args.size) | |
data = get_facerender_data(coeff_path, crop_pic_new_path, first_coeff_new_path, audio_path, | |
batch_size, input_yaw_list, input_pitch_list, input_roll_list, | |
expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess) | |
# result, base64_video,temp_file_path= animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
# enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess, img_size=args.size) | |
result, base64_video,temp_file_path,new_audio_path = animate_from_coeff.generate(data, save_dir, pic_path, crop_info, \ | |
enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess) | |
# face_path = temp_file_path | |
# audio_path = new_audio_path | |
# temp_file = tempfile.NamedTemporaryFile(delete=False, dir=TEMP_DIR.name, suffix='.mp4') | |
# video_lipsync_file_path = temp_file.name | |
# output_path = video_lipsync_file_path | |
# # Call the function | |
# inference_function.video_lipsync_correctness( | |
# face=face_path, | |
# audio_path=audio_path, | |
# face3d_net_path = path_of_net_recon_model, | |
# outfile=output_path, | |
# tmp_dir="temp", | |
# crop=[0, -1, 0, -1], | |
# re_preprocess=True, # Set to True if you want to reprocess; False otherwise | |
# exp_img="neutral", # Can be 'smile', 'neutral', or path to an expression image | |
# one_shot=False, | |
# up_face="original", # Options: 'original', 'sad', 'angry', 'surprise' | |
# LNet_batch_size=16, | |
# without_rl1=False | |
# ) | |
# print('The video with lip sync is generated') | |
# print("GFPGAN Activated") | |
# gfpgan_enhancer.process_video_with_gfpgan(output_path, output_path) | |
# audio_clip = mp.AudioFileClip(new_audio_path) | |
# video_clip = mp.VideoFileClip(output_path) | |
# # Combine audio and video | |
# final_clip = video_clip.set_audio(audio_clip) | |
# temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', dir=TEMP_DIR.name, delete=False) | |
# temp_file.close() | |
# final_video_path = temp_file.name | |
# final_clip.write_videofile(final_video_path) | |
# with open(final_video_path, 'rb') as f: | |
# video_content = f.read() | |
# base64_lipsync_video = base64.b64encode(video_content).decode('utf-8') | |
video_clip = mp.VideoFileClip(temp_file_path) | |
duration = video_clip.duration | |
app.config['temp_response'] = base64_video | |
app.config['final_video_path'] = temp_file_path | |
app.config['final_video_duration'] = duration | |
return base64_video, temp_file_path, duration | |
# shutil.move(result, save_dir+'.mp4') | |
if not args.verbose: | |
shutil.rmtree(save_dir) | |
def create_temp_dir(): | |
return tempfile.TemporaryDirectory() | |
def save_uploaded_file(file, filename,TEMP_DIR): | |
unique_filename = str(uuid.uuid4()) + "_" + filename | |
file_path = os.path.join(TEMP_DIR.name, unique_filename) | |
file.save(file_path) | |
return file_path | |
client = OpenAI(api_key="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") | |
def translate_text(text_prompt, target_language): | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "system", "content": "You are a helpful language translator assistant."}, | |
{"role": "user", "content": f"Translate completely without hallucination, end to end and the ouput should just be the translation of the text prompt and nothing else, and give the following text to {target_language} language and the text is: {text_prompt}"}, | |
], | |
max_tokens = len(text_prompt) + 200 # Use the length of the input text | |
# temperature=0.3, | |
# stop=["Translate:", "Text:"] | |
) | |
return response | |
def openai_chat_avatar(text_prompt): | |
response = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "system", "content": "Answer in English language always using the minimum words you can ever use."}, | |
{"role": "user", "content": f"Hi! I need help with something. Can you assist me with the following: {text_prompt}"}, | |
], | |
max_tokens = len(text_prompt) + 300 # Use the length of the input text | |
# temperature=0.3, | |
# stop=["Translate:", "Text:"] | |
) | |
return response | |
def ryzedb_chat_avatar(question): | |
url = "https://inference.dev.ryzeai.ai/chat/stream" | |
question = question + ". Summarize and Answer using the minimum words you can ever use." | |
payload = json.dumps({ | |
"input": { | |
"chat_history": [], | |
"app_id": "af6b2bc719a14478adcff1d71c19dc00", | |
"question": question | |
}, | |
"config": {} | |
}) | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
try: | |
# Send the POST request | |
response = requests.request("POST", url, headers=headers, data=payload) | |
# Check for successful request | |
response.raise_for_status() | |
# Return the response JSON | |
return response.text | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
return None | |
def custom_cleanup(temp_dir, exclude_dir): | |
# Iterate over the files and directories in TEMP_DIR | |
for filename in os.listdir(temp_dir): | |
file_path = os.path.join(temp_dir, filename) | |
# Skip the directory we want to exclude | |
if file_path != exclude_dir: | |
try: | |
if os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
else: | |
os.remove(file_path) | |
print(f"Deleted: {file_path}") | |
except Exception as e: | |
print(f"Failed to delete {file_path}. Reason: {e}") | |
def generate_video(): | |
global start_time | |
start_time = time.time() | |
global TEMP_DIR | |
TEMP_DIR = create_temp_dir() | |
print('request:',request.method) | |
try: | |
if request.method == 'POST': | |
# source_image = request.files['source_image'] | |
image_path = '/home/user/app/images/out.jpg' | |
source_image = Image.open(image_path) | |
text_prompt = request.form['text_prompt'] | |
print('Input text prompt: ',text_prompt) | |
text_prompt = text_prompt.strip() | |
if not text_prompt: | |
return jsonify({'error': 'Input text prompt cannot be blank'}), 400 | |
voice_cloning = request.form.get('voice_cloning', 'no') | |
image_hardcoded = request.form.get('image_hardcoded', 'yes') | |
chat_model_used = request.form.get('chat_model_used', 'openai') | |
target_language = request.form.get('target_language', 'original_text') | |
print('target_language',target_language) | |
pose_style = int(request.form.get('pose_style', 1)) | |
expression_scale = float(request.form.get('expression_scale', 1)) | |
enhancer = request.form.get('enhancer', None) | |
voice_gender = request.form.get('voice_gender', 'male') | |
still_str = request.form.get('still', 'False') | |
still = still_str.lower() == 'false' | |
print('still', still) | |
preprocess = request.form.get('preprocess', 'crop') | |
print('preprocess selected: ',preprocess) | |
ref_pose_video = request.files.get('ref_pose', None) | |
# if target_language != 'original_text': | |
# response = translate_text(text_prompt, target_language) | |
# # response = await translate_text_async(text_prompt, target_language) | |
# text_prompt = response.choices[0].message.content.strip() | |
if chat_model_used == 'ryzedb': | |
response = ryzedb_chat_avatar(text_prompt) | |
events = response.split('\r\n\r\n') | |
content = None | |
for event in events: | |
# Split each event block by "\r\n" to get the lines | |
lines = event.split('\r\n') | |
if len(lines) > 1 and lines[0] == 'event: data': | |
# Extract the JSON part from the second line and parse it | |
json_data = lines[1].replace('data: ', '') | |
try: | |
data = json.loads(json_data) | |
text_prompt = data.get('content') | |
app.config['text_prompt'] = text_prompt | |
print('Final output text prompt using ryzedb: ',text_prompt) | |
break # Exit the loop once content is found | |
except json.JSONDecodeError: | |
continue | |
else: | |
response = openai_chat_avatar(text_prompt) | |
text_prompt = response.choices[0].message.content.strip() | |
app.config['text_prompt'] = text_prompt | |
print('Final output text prompt using openai: ',text_prompt) | |
source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR) | |
print(source_image_path) | |
# driven_audio_path = await voice_cloning_async(voice_cloning, voice_gender, text_prompt, user_voice) | |
if voice_cloning == 'no': | |
if voice_gender == 'male': | |
voice = 'echo' | |
print('Entering Audio creation using elevenlabs') | |
set_api_key("92e149985ea2732b4359c74346c3daee") | |
audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_multilingual_v2",stream=True, latency=4) | |
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
for chunk in audio: | |
temp_file.write(chunk) | |
driven_audio_path = temp_file.name | |
print('driven_audio_path',driven_audio_path) | |
print('Audio file saved using elevenlabs') | |
else: | |
voice = 'nova' | |
print('Entering Audio creation using whisper') | |
response = client.audio.speech.create(model="tts-1-hd", | |
voice=voice, | |
input = text_prompt) | |
print('Audio created using whisper') | |
with tempfile.NamedTemporaryFile(suffix=".wav", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
driven_audio_path = temp_file.name | |
response.write_to_file(driven_audio_path) | |
print('Audio file saved using whisper') | |
elif voice_cloning == 'yes': | |
# user_voice = request.files['user_voice'] | |
# user_voice = '/home/user/app/images/marc_voice.mp3' | |
# with tempfile.NamedTemporaryFile(suffix=".wav", prefix="user_voice_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
# with open(user_voice, 'rb') as source_file: | |
# file_contents = source_file.read() | |
# temp_file.write(file_contents) | |
# temp_file.flush() | |
# user_voice_path = temp_file.name | |
# user_voice.save(user_voice_path) | |
# print('user_voice_path',user_voice_path) | |
set_api_key("92e149985ea2732b4359c74346c3daee") | |
# voice = clone(name = "User Cloned Voice", | |
# files = [user_voice_path] ) | |
voice = Voice(voice_id="DeZH4ash9IU9gUcNjVXh",name="Marc",settings=VoiceSettings( | |
stability=0.71, similarity_boost=0.5, style=0.0, use_speaker_boost=True),) | |
audio = generate(text = text_prompt, voice = voice, model = "eleven_multilingual_v2",stream=True, latency=4) | |
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
for chunk in audio: | |
temp_file.write(chunk) | |
driven_audio_path = temp_file.name | |
print('driven_audio_path',driven_audio_path) | |
# elevenlabs.save(audio, driven_audio_path) | |
save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name) | |
result_folder = os.path.join(save_dir, "results") | |
os.makedirs(result_folder, exist_ok=True) | |
ref_pose_video_path = None | |
if ref_pose_video: | |
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="ref_pose_",dir=TEMP_DIR.name, delete=False) as temp_file: | |
ref_pose_video_path = temp_file.name | |
ref_pose_video.save(ref_pose_video_path) | |
print('ref_pose_video_path',ref_pose_video_path) | |
except Exception as e: | |
app.logger.error(f"An error occurred: {e}") | |
return "An error occurred", 500 | |
# Example of using the class with some hypothetical paths | |
args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded) | |
if torch.cuda.is_available() and not args.cpu: | |
args.device = "cuda" | |
else: | |
args.device = "cpu" | |
# generation_thread = threading.Thread(target=main, args=(args,)) | |
# app.config['generation_thread'] = generation_thread | |
# generation_thread.start() | |
# response_data = {"message": "Video generation started", | |
# "process_id": generation_thread.ident} | |
try: | |
base64_video, temp_file_path, duration = main(args) | |
final_video_path = app.config['final_video_path'] | |
print('final_video_path',final_video_path) | |
if final_video_path and os.path.exists(final_video_path): | |
os.remove(final_video_path) | |
print("Deleted video file:", final_video_path) | |
preprocess_dir = os.path.join("/tmp", "preprocess_data") | |
custom_cleanup(TEMP_DIR.name, preprocess_dir) | |
print("Temporary files cleaned up, but preprocess_data is retained.") | |
end_time = time.time() | |
time_taken = end_time - start_time | |
print(f"Time taken for endpoint: {time_taken:.2f} seconds") | |
return jsonify({ | |
'base64_video': base64_video, | |
'text_prompt': text_prompt, | |
'duration': duration, | |
'time_taken':time_taken, | |
'status': 'completed' | |
}) | |
except Exception as e: | |
return jsonify({'status': 'error', 'message': str(e)}), 500 | |
# return jsonify(response_data) | |
# @app.route("/status", methods=["GET"]) | |
# def check_generation_status(): | |
# global TEMP_DIR | |
# global start_time | |
# response = {"base64_video": "","text_prompt":"", "status": ""} | |
# process_id = request.args.get('process_id', None) | |
# # process_id is required to check the status for that specific process | |
# if process_id: | |
# generation_thread = app.config.get('generation_thread') | |
# if generation_thread and generation_thread.ident == int(process_id) and generation_thread.is_alive(): | |
# return jsonify({"status": "in_progress"}), 200 | |
# elif app.config.get('temp_response'): | |
# # app.config['temp_response']['status'] = 'completed' | |
# final_response = app.config['temp_response'] | |
# response["base64_video"] = final_response | |
# response["text_prompt"] = app.config.get('text_prompt') | |
# response["duration"] = app.config.get('final_video_duration') | |
# response["status"] = "completed" | |
# final_video_path = app.config['final_video_path'] | |
# print('final_video_path',final_video_path) | |
# if final_video_path and os.path.exists(final_video_path): | |
# os.remove(final_video_path) | |
# print("Deleted video file:", final_video_path) | |
# # TEMP_DIR.cleanup() | |
# preprocess_dir = os.path.join("/tmp", "preprocess_data") | |
# custom_cleanup(TEMP_DIR.name, preprocess_dir) | |
# print("Temporary files cleaned up, but preprocess_data is retained.") | |
# end_time = time.time() | |
# total_time = round(end_time - start_time, 2) | |
# print("Total time taken for execution:", total_time, " seconds") | |
# response["time_taken"] = total_time | |
# return jsonify(response) | |
# return jsonify({"error":"No process id provided"}) | |
def health_status(): | |
response = {"online": "true"} | |
return jsonify(response) | |
if __name__ == '__main__': | |
app.run(debug=True) |