from flask import Flask, render_template, request, jsonify |
from flask_socketio import SocketIO |
import sys |
import os |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
import shutil |
import numpy as np |
from PIL import Image |
from sam2.build_sam import build_sam2 |
from sam2.sam2_image_predictor import SAM2ImagePredictor |
class Predictor: |
def __init__(self, model_cfg, checkpoint, device): |
self.device = device |
self.model = build_sam2(model_cfg, checkpoint, device=device) |
self.predictor = SAM2ImagePredictor(self.model) |
self.image_set = False |
def set_image(self, image): |
"""Set the image for SAM prediction.""" |
self.image = image |
self.predictor.set_image(image) |
self.image_set = True |
def predict(self, point_coords, point_labels, multimask_output=False): |
"""Run SAM prediction.""" |
if not self.image_set: |
raise RuntimeError("An image must be set with .set_image(...) before mask prediction.") |
return self.predictor.predict( |
point_coords=point_coords, |
point_labels=point_labels, |
multimask_output=multimask_output |
) |
from utils.helpers import ( |
blend_mask_with_image, |
save_mask_as_png, |
convert_mask_to_yolo, |
) |
import torch |
from ultralytics import YOLO |
import threading |
from threading import Lock |
import subprocess |
import time |
import logging |
import multiprocessing |
import json |
app = Flask(__name__) |
socketio = SocketIO(app) |
BASE_DIR = os.path.abspath(os.path.dirname(__file__)) |
'input': os.path.join(BASE_DIR, 'static/uploads/input'), |
'segmented_voids': os.path.join(BASE_DIR, 'static/uploads/segmented/voids'), |
'segmented_chips': os.path.join(BASE_DIR, 'static/uploads/segmented/chips'), |
'mask_voids': os.path.join(BASE_DIR, 'static/uploads/mask/voids'), |
'mask_chips': os.path.join(BASE_DIR, 'static/uploads/mask/chips'), |
'automatic_segmented': os.path.join(BASE_DIR, 'static/uploads/segmented/automatic'), |
} |
'images': os.path.join(BASE_DIR, 'static/history/images'), |
'masks_chip': os.path.join(BASE_DIR, 'static/history/masks/chip'), |
'masks_void': os.path.join(BASE_DIR, 'static/history/masks/void'), |
} |
'train_images': os.path.join(BASE_DIR, 'dataset/train/images'), |
'train_labels': os.path.join(BASE_DIR, 'dataset/train/labels'), |
'val_images': os.path.join(BASE_DIR, 'dataset/val/images'), |
'val_labels': os.path.join(BASE_DIR, 'dataset/val/labels'), |
'temp_backup': os.path.join(BASE_DIR, 'temp_backup'), |
'models': os.path.join(BASE_DIR, 'models'), |
'models_old': os.path.join(BASE_DIR, 'models/old'), |
} |
for folder_name, folder_path in {**UPLOAD_FOLDERS, **HISTORY_FOLDERS, **DATASET_FOLDERS}.items(): |
os.makedirs(folder_path, exist_ok=True) |
logging.info(f"Ensured folder exists: {folder_name} -> {folder_path}") |
training_process = None |
def initialize_training_status(): |
"""Initialize global training status.""" |
global training_status |
training_status = {'running': False, 'cancelled': False} |
def persist_training_status(): |
"""Save training status to a file.""" |
with open(os.path.join(BASE_DIR, 'training_status.json'), 'w') as status_file: |
json.dump(training_status, status_file) |
def load_training_status(): |
"""Load training status from a file.""" |
global training_status |
status_path = os.path.join(BASE_DIR, 'training_status.json') |
if os.path.exists(status_path): |
with open(status_path, 'r') as status_file: |
training_status = json.load(status_file) |
else: |
training_status = {'running': False, 'cancelled': False} |
load_training_status() |
os.environ["TORCH_CUDNN_SDPA_ENABLED"] = "0" |
MODEL_CFG = r"project/sam2/sam2_hiera_l.yaml" |
CHECKPOINT = r"project/sam2/sam2.1_hiera_large.pt" |
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
predictor = Predictor(MODEL_CFG, CHECKPOINT, DEVICE) |
YOLO_CFG = os.path.join(DATASET_FOLDERS['models'], "best.pt") |
yolo_model = YOLO(YOLO_CFG) |
logging.basicConfig( |
level=logging.INFO, |
format='%(asctime)s [%(levelname)s] %(message)s', |
handlers=[ |
logging.StreamHandler(), |
logging.FileHandler(os.path.join(BASE_DIR, "app.log")) |
] |
) |
@app.route('/') |
def index(): |
"""Serve the main UI.""" |
return render_template('index.html') |
@app.route('/upload', methods=['POST']) |
def upload_image(): |
"""Handle image uploads.""" |
if 'file' not in request.files: |
return jsonify({'error': 'No file uploaded'}), 400 |
file = request.files['file'] |
if file.filename == '': |
return jsonify({'error': 'No file selected'}), 400 |
input_path = os.path.join(UPLOAD_FOLDERS['input'], file.filename) |
file.save(input_path) |
image = np.array(Image.open(input_path).convert("RGB")) |
predictor.set_image(image) |
web_accessible_url = f"/static/uploads/input/{file.filename}" |
print(f"Image uploaded and set for prediction: {input_path}") |
return jsonify({'image_url': web_accessible_url}) |
@app.route('/segment', methods=['POST']) |
def segment(): |
""" |
Perform segmentation and return the blended image URL. |
""" |
try: |
data = request.json |
points = np.array(data.get('points', [])) |
labels = np.array(data.get('labels', [])) |
current_class = data.get('class', 'voids') |
if not predictor.image_set: |
raise ValueError("No image set for prediction.") |
masks, _, _ = predictor.predict( |
point_coords=points, |
point_labels=labels, |
multimask_output=False |
) |
if masks is None or masks.size == 0: |
raise RuntimeError("No masks were generated by the predictor.") |
mask_folder = UPLOAD_FOLDERS.get(f'mask_{current_class}') |
segmented_folder = UPLOAD_FOLDERS.get(f'segmented_{current_class}') |
if not mask_folder or not segmented_folder: |
raise ValueError(f"Invalid class '{current_class}' provided.") |
os.makedirs(mask_folder, exist_ok=True) |
os.makedirs(segmented_folder, exist_ok=True) |
mask_path = os.path.join(mask_folder, 'raw_mask.png') |
save_mask_as_png(masks[0], mask_path) |
blend_color = [34, 139, 34] if current_class == 'voids' else [30, 144, 255] |
blended_image = blend_mask_with_image(predictor.image, masks[0], blend_color) |
blended_filename = f"blended_{current_class}.png" |
blended_path = os.path.join(segmented_folder, blended_filename) |
Image.fromarray(blended_image).save(blended_path) |
segmented_url = f"/static/uploads/segmented/{current_class}/{blended_filename}" |
logging.info(f"Segmentation completed for {current_class}. Points: {points}, Labels: {labels}") |
return jsonify({'segmented_url': segmented_url}) |
except ValueError as ve: |
logging.error(f"Value error during segmentation: {ve}") |
return jsonify({'error': str(ve)}), 400 |
except Exception as e: |
logging.error(f"Unexpected error during segmentation: {e}") |
return jsonify({'error': 'Segmentation failed', 'details': str(e)}), 500 |
@app.route('/automatic_segment', methods=['POST']) |
def automatic_segment(): |
"""Perform automatic segmentation using YOLO.""" |
if 'file' not in request.files: |
return jsonify({'error': 'No file uploaded'}), 400 |
file = request.files['file'] |
if file.filename == '': |
return jsonify({'error': 'No file selected'}), 400 |
input_path = os.path.join(UPLOAD_FOLDERS['input'], file.filename) |
file.save(input_path) |
try: |
results = yolo_model.predict(input_path, save=False, save_txt=False) |
output_folder = UPLOAD_FOLDERS['automatic_segmented'] |
os.makedirs(output_folder, exist_ok=True) |
chips_data = [] |
chips = [] |
voids = [] |
for result in results: |
annotated_image = result.plot() |
result_filename = f"{file.filename.rsplit('.', 1)[0]}_pred.jpg" |
result_path = os.path.join(output_folder, result_filename) |
Image.fromarray(annotated_image).save(result_path) |
for i, label in enumerate(result.boxes.cls): |
label_name = result.names[int(label)] |
box = result.boxes.xyxy[i].cpu().numpy() |
area = float((box[2] - box[0]) * (box[3] - box[1])) |
if label_name == 'chip': |
chips.append({'box': box, 'area': area, 'voids': []}) |
elif label_name == 'void': |
voids.append({'box': box, 'area': area}) |
for void in voids: |
void_centroid = [ |
(void['box'][0] + void['box'][2]) / 2, |
(void['box'][1] + void['box'][3]) / 2 |
] |
for chip in chips: |
if (chip['box'][0] <= void_centroid[0] <= chip['box'][2] and |
chip['box'][1] <= void_centroid[1] <= chip['box'][3]): |
chip['voids'].append(void) |
break |
for idx, chip in enumerate(chips): |
chip_area = chip['area'] |
total_void_area = sum([float(void['area']) for void in chip['voids']]) |
max_void_area = max([float(void['area']) for void in chip['voids']], default=0) |
void_percentage = (total_void_area / chip_area) * 100 if chip_area > 0 else 0 |
max_void_percentage = (max_void_area / chip_area) * 100 if chip_area > 0 else 0 |
chips_data.append({ |
"chip_number": int(idx + 1), |
"chip_area": round(chip_area, 2), |
"void_percentage": round(void_percentage, 2), |
"max_void_percentage": round(max_void_percentage, 2) |
}) |
segmented_url = f"/static/uploads/segmented/automatic/{result_filename}" |
return jsonify({ |
"segmented_url": segmented_url, |
"table_data": { |
"image_name": file.filename, |
"chips": chips_data |
} |
}) |
except Exception as e: |
print(f"Error in automatic segmentation: {e}") |
return jsonify({'error': 'Segmentation failed.'}), 500 |
@app.route('/save_both', methods=['POST']) |
def save_both(): |
"""Save both the image and masks into the history folders.""" |
data = request.json |
image_name = data.get('image_name') |
if not image_name: |
return jsonify({'error': 'Image name not provided'}), 400 |
try: |
image_name = os.path.basename(image_name) |
print(f"Sanitized Image Name: {image_name}") |
input_image_path = os.path.join(UPLOAD_FOLDERS['input'], image_name) |
if not os.path.exists(input_image_path): |
print(f"Input image does not exist: {input_image_path}") |
return jsonify({'error': f'Input image not found: {input_image_path}'}), 404 |
image_history_path = os.path.join(HISTORY_FOLDERS['images'], image_name) |
os.makedirs(os.path.dirname(image_history_path), exist_ok=True) |
shutil.copy(input_image_path, image_history_path) |
print(f"Image saved to history: {image_history_path}") |
void_mask_path = os.path.join(UPLOAD_FOLDERS['mask_voids'], 'raw_mask.png') |
if os.path.exists(void_mask_path): |
void_mask_history_path = os.path.join(HISTORY_FOLDERS['masks_void'], f"{os.path.splitext(image_name)[0]}.png") |
os.makedirs(os.path.dirname(void_mask_history_path), exist_ok=True) |
shutil.copy(void_mask_path, void_mask_history_path) |
print(f"Voids mask saved to history: {void_mask_history_path}") |
else: |
print(f"Voids mask not found: {void_mask_path}") |
chip_mask_path = os.path.join(UPLOAD_FOLDERS['mask_chips'], 'raw_mask.png') |
if os.path.exists(chip_mask_path): |
chip_mask_history_path = os.path.join(HISTORY_FOLDERS['masks_chip'], f"{os.path.splitext(image_name)[0]}.png") |
os.makedirs(os.path.dirname(chip_mask_history_path), exist_ok=True) |
shutil.copy(chip_mask_path, chip_mask_history_path) |
print(f"Chips mask saved to history: {chip_mask_history_path}") |
else: |
print(f"Chips mask not found: {chip_mask_path}") |
return jsonify({'message': 'Image and masks saved successfully!'}), 200 |
except Exception as e: |
print(f"Error saving files: {e}") |
return jsonify({'error': 'Failed to save files.', 'details': str(e)}), 500 |
@app.route('/get_history', methods=['GET']) |
def get_history(): |
try: |
saved_images = os.listdir(HISTORY_FOLDERS['images']) |
return jsonify({'status': 'success', 'images': saved_images}), 200 |
except Exception as e: |
return jsonify({'status': 'error', 'message': f'Failed to fetch history: {e}'}), 500 |
@app.route('/delete_history_item', methods=['POST']) |
def delete_history_item(): |
data = request.json |
image_name = data.get('image_name') |
if not image_name: |
return jsonify({'error': 'Image name not provided'}), 400 |
try: |
image_path = os.path.join(HISTORY_FOLDERS['images'], image_name) |
if os.path.exists(image_path): |
os.remove(image_path) |
void_mask_path = os.path.join(HISTORY_FOLDERS['masks_void'], f"{os.path.splitext(image_name)[0]}.png") |
if os.path.exists(void_mask_path): |
os.remove(void_mask_path) |
chip_mask_path = os.path.join(HISTORY_FOLDERS['masks_chip'], f"{os.path.splitext(image_name)[0]}.png") |
if os.path.exists(chip_mask_path): |
os.remove(chip_mask_path) |
return jsonify({'message': f'{image_name} and associated masks deleted successfully.'}), 200 |
except Exception as e: |
return jsonify({'error': f'Failed to delete files: {e}'}), 500 |
status_lock = Lock() |
def update_training_status(key, value): |
"""Thread-safe update for training status.""" |
with status_lock: |
training_status[key] = value |
@app.route('/retrain_model', methods=['POST']) |
def retrain_model(): |
"""Handle retrain model workflow.""" |
global training_status |
if training_status.get('running', False): |
return jsonify({'error': 'Training is already in progress'}), 400 |
try: |
update_training_status('running', True) |
update_training_status('cancelled', False) |
logging.info("Training status updated. Starting training workflow.") |
backup_masks_and_images() |
logging.info("Backup completed successfully.") |
prepare_yolo_labels() |
logging.info("YOLO labels prepared successfully.") |
threading.Thread(target=run_yolo_training).start() |
return jsonify({'message': 'Training started successfully!'}), 200 |
except Exception as e: |
logging.error(f"Error during training preparation: {e}") |
update_training_status('running', False) |
return jsonify({'error': f"Failed to start training: {e}"}), 500 |
def prepare_yolo_labels(): |
"""Convert all masks into YOLO-compatible labels and copy images to the dataset folder.""" |
images_folder = HISTORY_FOLDERS['images'] |
train_labels_folder = DATASET_FOLDERS['train_labels'] |
train_images_folder = DATASET_FOLDERS['train_images'] |
val_labels_folder = DATASET_FOLDERS['val_labels'] |
val_images_folder = DATASET_FOLDERS['val_images'] |
os.makedirs(train_labels_folder, exist_ok=True) |
os.makedirs(train_images_folder, exist_ok=True) |
os.makedirs(val_labels_folder, exist_ok=True) |
os.makedirs(val_images_folder, exist_ok=True) |
try: |
all_images = [img for img in os.listdir(images_folder) if img.endswith(('.jpg', '.png'))] |
random.shuffle(all_images) |
split_idx = int(len(all_images) * 0.8) |
train_images = all_images[:split_idx] |
val_images = all_images[split_idx:] |
for image_name in train_images: |
process_image_and_mask( |
image_name, |
source_images_folder=images_folder, |
dest_images_folder=train_images_folder, |
dest_labels_folder=train_labels_folder |
) |
for image_name in val_images: |
process_image_and_mask( |
image_name, |
source_images_folder=images_folder, |
dest_images_folder=val_images_folder, |
dest_labels_folder=val_labels_folder |
) |
logging.info("YOLO labels prepared, and images split into train and validation successfully.") |
except Exception as e: |
logging.error(f"Error in preparing YOLO labels: {e}") |
raise |
import random |
def prepare_yolo_labels(): |
"""Convert all masks into YOLO-compatible labels and copy images to the dataset folder.""" |
images_folder = HISTORY_FOLDERS['images'] |
train_labels_folder = DATASET_FOLDERS['train_labels'] |
train_images_folder = DATASET_FOLDERS['train_images'] |
val_labels_folder = DATASET_FOLDERS['val_labels'] |
val_images_folder = DATASET_FOLDERS['val_images'] |
os.makedirs(train_labels_folder, exist_ok=True) |
os.makedirs(train_images_folder, exist_ok=True) |
os.makedirs(val_labels_folder, exist_ok=True) |
os.makedirs(val_images_folder, exist_ok=True) |
try: |
all_images = [img for img in os.listdir(images_folder) if img.endswith(('.jpg', '.png'))] |
random.shuffle(all_images) |
split_idx = int(len(all_images) * 0.8) |
train_images = all_images[:split_idx] |
val_images = all_images[split_idx:] |
for image_name in train_images: |
process_image_and_mask( |
image_name, |
source_images_folder=images_folder, |
dest_images_folder=train_images_folder, |
dest_labels_folder=train_labels_folder |
) |
for image_name in val_images: |
process_image_and_mask( |
image_name, |
source_images_folder=images_folder, |
dest_images_folder=val_images_folder, |
dest_labels_folder=val_labels_folder |
) |
logging.info("YOLO labels prepared, and images split into train and validation successfully.") |
except Exception as e: |
logging.error(f"Error in preparing YOLO labels: {e}") |
raise |
def process_image_and_mask(image_name, source_images_folder, dest_images_folder, dest_labels_folder): |
""" |
Process a single image and its masks, saving them in the appropriate YOLO format. |
""" |
try: |
image_path = os.path.join(source_images_folder, image_name) |
label_file_path = os.path.join(dest_labels_folder, f"{os.path.splitext(image_name)[0]}.txt") |
shutil.copy(image_path, os.path.join(dest_images_folder, image_name)) |
if os.path.exists(label_file_path): |
os.remove(label_file_path) |
void_mask_path = os.path.join(HISTORY_FOLDERS['masks_void'], f"{os.path.splitext(image_name)[0]}.png") |
if os.path.exists(void_mask_path): |
convert_mask_to_yolo( |
mask_path=void_mask_path, |
image_path=image_path, |
class_id=0, |
output_path=label_file_path |
) |
chip_mask_path = os.path.join(HISTORY_FOLDERS['masks_chip'], f"{os.path.splitext(image_name)[0]}.png") |
if os.path.exists(chip_mask_path): |
convert_mask_to_yolo( |
mask_path=chip_mask_path, |
image_path=image_path, |
class_id=1, |
output_path=label_file_path, |
append=True |
) |
logging.info(f"Processed {image_name} into YOLO format.") |
except Exception as e: |
logging.error(f"Error processing {image_name}: {e}") |
raise |
def backup_masks_and_images(): |
"""Backup current masks and images from history folders.""" |
temp_backup_paths = { |
'voids': os.path.join(DATASET_FOLDERS['temp_backup'], 'masks/voids'), |
'chips': os.path.join(DATASET_FOLDERS['temp_backup'], 'masks/chips'), |
'images': os.path.join(DATASET_FOLDERS['temp_backup'], 'images') |
} |
for path in temp_backup_paths.values(): |
if os.path.exists(path): |
shutil.rmtree(path) |
os.makedirs(path, exist_ok=True) |
try: |
for file in os.listdir(HISTORY_FOLDERS['images']): |
src_image_path = os.path.join(HISTORY_FOLDERS['images'], file) |
dst_image_path = os.path.join(temp_backup_paths['images'], file) |
shutil.copy(src_image_path, dst_image_path) |
for file in os.listdir(HISTORY_FOLDERS['masks_void']): |
src_void_path = os.path.join(HISTORY_FOLDERS['masks_void'], file) |
dst_void_path = os.path.join(temp_backup_paths['voids'], file) |
shutil.copy(src_void_path, dst_void_path) |
for file in os.listdir(HISTORY_FOLDERS['masks_chip']): |
src_chip_path = os.path.join(HISTORY_FOLDERS['masks_chip'], file) |
dst_chip_path = os.path.join(temp_backup_paths['chips'], file) |
shutil.copy(src_chip_path, dst_chip_path) |
logging.info("Masks and images backed up successfully from history.") |
except Exception as e: |
logging.error(f"Error during backup: {e}") |
raise RuntimeError("Backup process failed.") |
def run_yolo_training(num_epochs=10): |
"""Run YOLO training process.""" |
global training_process |
try: |
device = "cuda" if torch.cuda.is_available() else "cpu" |
data_cfg_path = os.path.join(BASE_DIR, "models/data.yaml") |
logging.info(f"Starting YOLO training on {device} with {num_epochs} epochs.") |
logging.info(f"Using dataset configuration: {data_cfg_path}") |
training_command = [ |
"yolo", |
"train", |
f"data={data_cfg_path}", |
f"model={os.path.join(DATASET_FOLDERS['models'], 'best.pt')}", |
f"device={device}", |
f"epochs={num_epochs}", |
"project=runs", |
"name=train" |
] |
training_process = subprocess.Popen( |
training_command, |
stdout=subprocess.PIPE, |
stderr=subprocess.STDOUT, |
text=True, |
env=os.environ.copy(), |
) |
for line in iter(training_process.stdout.readline, ''): |
print(line.strip()) |
logging.info(line.strip()) |
socketio.emit('training_update', {'message': line.strip()}) |
training_process.wait() |
if training_process.returncode == 0: |
finalize_training() |
else: |
raise RuntimeError("YOLO training process failed. Check logs for details.") |
except Exception as e: |
logging.error(f"Training error: {e}") |
restore_backup() |
socketio.emit('training_status', {'status': 'error', 'message': f"Training failed: {str(e)}"}) |
finally: |
update_training_status('running', False) |
training_process = None |
@socketio.on('cancel_training') |
def handle_cancel_training(): |
"""Cancel the YOLO training process.""" |
global training_process, training_status |
if not training_status.get('running', False): |
socketio.emit('button_update', {'action': 'retrain'}) |
return |
try: |
training_process.terminate() |
training_process.wait() |
training_status['running'] = False |
training_status['cancelled'] = True |
restore_backup() |
cleanup_train_val_directories() |
socketio.emit('button_update', {'action': 'retrain'}) |
socketio.emit('training_status', {'status': 'cancelled', 'message': 'Training was canceled by the user.'}) |
except Exception as e: |
logging.error(f"Error cancelling training: {e}") |
socketio.emit('training_status', {'status': 'error', 'message': str(e)}) |
def finalize_training(): |
"""Finalize training by promoting the new model and cleaning up.""" |
try: |
runs_dir = os.path.join(BASE_DIR, 'runs') |
if not os.path.exists(runs_dir): |
raise FileNotFoundError("Training runs directory does not exist.") |
latest_run = max( |
[os.path.join(runs_dir, d) for d in os.listdir(runs_dir)], |
key=os.path.getmtime |
) |
weights_dir = os.path.join(latest_run, 'weights') |
best_model_path = os.path.join(weights_dir, 'best.pt') |
if not os.path.exists(best_model_path): |
raise FileNotFoundError(f"'best.pt' not found in {weights_dir}.") |
old_model_folder = DATASET_FOLDERS['models_old'] |
os.makedirs(old_model_folder, exist_ok=True) |
existing_best_model = os.path.join(DATASET_FOLDERS['models'], 'best.pt') |
if os.path.exists(existing_best_model): |
timestamp = time.strftime("%Y%m%d_%H%M%S") |
shutil.move(existing_best_model, os.path.join(old_model_folder, f"old_{timestamp}.pt")) |
logging.info(f"Old model backed up to {old_model_folder}.") |
new_model_dest = os.path.join(DATASET_FOLDERS['models'], 'best.pt') |
shutil.move(best_model_path, new_model_dest) |
logging.info(f"New model saved to {new_model_dest}.") |
socketio.emit('training_status', { |
'status': 'completed', |
'message': 'Training completed successfully! Model saved as best.pt.' |
}) |
cleanup_train_val_directories() |
logging.info("Train and validation directories cleaned up successfully.") |
except Exception as e: |
logging.error(f"Error finalizing training: {e}") |
socketio.emit('training_status', {'status': 'error', 'message': f"Error finalizing training: {str(e)}"}) |
def restore_backup(): |
"""Restore the dataset and masks from the backup.""" |
try: |
temp_backup = DATASET_FOLDERS['temp_backup'] |
shutil.copytree(os.path.join(temp_backup, 'masks/voids'), UPLOAD_FOLDERS['mask_voids'], dirs_exist_ok=True) |
shutil.copytree(os.path.join(temp_backup, 'masks/chips'), UPLOAD_FOLDERS['mask_chips'], dirs_exist_ok=True) |
shutil.copytree(os.path.join(temp_backup, 'images'), UPLOAD_FOLDERS['input'], dirs_exist_ok=True) |
logging.info("Backup restored successfully.") |
except Exception as e: |
logging.error(f"Error restoring backup: {e}") |
@app.route('/cancel_training', methods=['POST']) |
def cancel_training(): |
global training_process |
if training_process is None: |
logging.error("No active training process to terminate.") |
return jsonify({'error': 'No active training process to cancel.'}), 400 |
try: |
training_process.terminate() |
training_process.wait() |
training_process = None |
update_training_status('running', False) |
update_training_status('cancelled', True) |
best_model_path = os.path.join(DATASET_FOLDERS['models'], 'best.pt') |
if os.path.exists(best_model_path): |
logging.info(f"Model already saved as best.pt at {best_model_path}.") |
socketio.emit('button_update', {'action': 'revert'}) |
else: |
logging.info("Training canceled, but no new model was saved.") |
restore_backup() |
cleanup_train_val_directories() |
socketio.emit('training_status', {'status': 'cancelled', 'message': 'Training was canceled by the user.'}) |
return jsonify({'message': 'Training canceled and data restored successfully.'}), 200 |
except Exception as e: |
logging.error(f"Error cancelling training: {e}") |
return jsonify({'error': f"Failed to cancel training: {e}"}), 500 |
@app.route('/clear_history', methods=['POST']) |
def clear_history(): |
try: |
for folder in [HISTORY_FOLDERS['images'], HISTORY_FOLDERS['masks_chip'], HISTORY_FOLDERS['masks_void']]: |
shutil.rmtree(folder, ignore_errors=True) |
os.makedirs(folder, exist_ok=True) |
return jsonify({'message': 'History cleared successfully!'}), 200 |
except Exception as e: |
return jsonify({'error': f'Failed to clear history: {e}'}), 500 |
@app.route('/training_status', methods=['GET']) |
def get_training_status(): |
"""Return the current training status.""" |
if training_status.get('running', False): |
return jsonify({'status': 'running', 'message': 'Training in progress.'}), 200 |
elif training_status.get('cancelled', False): |
return jsonify({'status': 'cancelled', 'message': 'Training was cancelled.'}), 200 |
return jsonify({'status': 'idle', 'message': 'No training is currently running.'}), 200 |
def cleanup_train_val_directories(): |
"""Clear the train and validation directories.""" |
try: |
for folder in [DATASET_FOLDERS['train_images'], DATASET_FOLDERS['train_labels'], |
DATASET_FOLDERS['val_images'], DATASET_FOLDERS['val_labels']]: |
shutil.rmtree(folder, ignore_errors=True) |
os.makedirs(folder, exist_ok=True) |
logging.info("Train and validation directories cleaned up successfully.") |
except Exception as e: |
logging.error(f"Error cleaning up train/val directories: {e}") |
if __name__ == '__main__': |
multiprocessing.set_start_method('spawn') |
app.run(debug=True, use_reloader=False) |