|
import cv2 |
|
from .plot import * |
|
from abcli import file |
|
from abcli import path |
|
from abcli import string |
|
from abcli.plugins import graphics |
|
from abcli.tasks import host |
|
from abcli.tasks import objects |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import tensorflow as tf |
|
from tqdm import * |
|
import time |
|
from abcli.logging import crash_report |
|
import abcli.logging |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
default_window_size = 28 |
|
|
|
|
|
class Image_Classifier(object): |
|
def __init__(self): |
|
self.class_names = [] |
|
self.model = None |
|
|
|
self.params = { |
|
"convnet": False, |
|
"object_name": "", |
|
"model_size": "", |
|
"window_size": default_window_size, |
|
} |
|
|
|
def load(self, model_path): |
|
success, self.class_names = file.load_json( |
|
f"{model_path}/image_classifier/model/class_names.json" |
|
) |
|
if not success: |
|
return False |
|
|
|
success, self.params = file.load_json( |
|
f"{model_path}/image_classifier/model/params.json" |
|
) |
|
if not success: |
|
return False |
|
|
|
self.params["object_name"] = path.name(model_path) |
|
|
|
self.params["model_size"] = file.size(f"{model_path}/image_classifier/model") |
|
|
|
try: |
|
self.model = tf.keras.models.load_model( |
|
f"{model_path}/image_classifier/model" |
|
) |
|
except: |
|
from abcli.logging import crash_report |
|
|
|
crash_report("image_classifier.load({}) failed".format(model_path)) |
|
return False |
|
|
|
logger.info( |
|
"{}.load({}x{}:{}): {}{} class(es): {}".format( |
|
self.__class__.__name__, |
|
self.params["window_size"], |
|
self.params["window_size"], |
|
path.name(model_path), |
|
"convnet - " if self.params["convnet"] else "", |
|
len(self.class_names), |
|
",".join(self.class_names), |
|
) |
|
) |
|
self.model.summary() |
|
|
|
return True |
|
|
|
def predict(self, test_images, test_labels, output_path="", page_count=-1): |
|
logger.info( |
|
"image_classifier.predict({},{}){}".format( |
|
string.pretty_shape_of_matrix(test_images), |
|
string.pretty_shape_of_matrix(test_labels), |
|
"-> {}".format(output_path) if output_path else "", |
|
) |
|
) |
|
|
|
prediction_time = time.time() |
|
predictions = self.model.predict(test_images) |
|
prediction_time = (time.time() - prediction_time) / test_images.shape[0] |
|
logger.info( |
|
"image_classifier.predict(): {} / frame".format( |
|
string.pretty_duration( |
|
prediction_time, |
|
include_ms=True, |
|
) |
|
) |
|
) |
|
|
|
if not output_path: |
|
return True |
|
|
|
if not file.save( |
|
f"{output_path}/image_classifier/predictions.pyndarray", predictions |
|
): |
|
return False |
|
|
|
if test_labels is not None: |
|
from sklearn.metrics import confusion_matrix |
|
|
|
logger.info("image_classifier.predict(): rendering confusion_matrix...") |
|
|
|
cm = confusion_matrix( |
|
test_labels, |
|
np.argmax(predictions, axis=1), |
|
labels=range(len(self.class_names)), |
|
|
|
) |
|
cm = cm / np.sum(cm, axis=1)[:, np.newaxis] |
|
logger.debug("confusion_matrix: {}".format(cm)) |
|
|
|
if not file.save( |
|
f"{output_path}/image_classifier/model/confusion_matrix.pyndarray", cm |
|
): |
|
return False |
|
|
|
if not graphics.render_confusion_matrix( |
|
cm, |
|
self.class_names, |
|
f"{output_path}/image_classifier/model/confusion_matrix.jpg", |
|
header=[ |
|
" | ".join(host.signature()), |
|
" | ".join(objects.signature()), |
|
], |
|
footer=self.signature(prediction_time), |
|
): |
|
return False |
|
|
|
if test_labels is not None: |
|
logger.info( |
|
"image_classifier.predict(): rendering test_labels distribution..." |
|
) |
|
|
|
|
|
|
|
distribution = np.bincount(test_labels) |
|
distribution = distribution / np.sum(distribution) |
|
|
|
if not graphics.render_distribution( |
|
distribution, |
|
self.class_names, |
|
f"{output_path}/image_classifier/model/label_distribution.jpg", |
|
header=[ |
|
" | ".join(host.signature()), |
|
" | ".join(objects.signature()), |
|
], |
|
footer=self.signature(prediction_time), |
|
title="distribution of test_labels", |
|
): |
|
return False |
|
|
|
max_index = test_images.shape[0] |
|
if page_count != -1: |
|
max_index = min(24 * page_count, max_index) |
|
logger.info( |
|
f"image_classifier.predict(): rendering {max_index / 24:.0f} sheet(s)..." |
|
) |
|
for index in tqdm(range(0, max_index, 24)): |
|
self.render( |
|
predictions[index : index + 24], |
|
None if test_labels is None else test_labels[index : index + 24], |
|
test_images[index : index + 24], |
|
"{}/image_classifier/prediction/{:05d}.jpg".format( |
|
output_path, |
|
int(index / 24), |
|
), |
|
prediction_time, |
|
) |
|
|
|
return True |
|
|
|
def predict_frame(self, frame): |
|
prediction_time = time.time() |
|
try: |
|
prediction = self.model.predict( |
|
np.expand_dims( |
|
cv2.resize( |
|
frame, (self.params["window_size"], self.params["window_size"]) |
|
) |
|
/ 255.0, |
|
axis=0, |
|
) |
|
) |
|
except: |
|
from abcli.logging import crash_report |
|
|
|
crash_report("image_classifier.predict_frame() crashed.") |
|
return False, -1 |
|
|
|
prediction_time = time.time() - prediction_time |
|
|
|
output = np.argmax(prediction) |
|
|
|
logger.info( |
|
"image_classifier.prediction: [{}] -> {} - took {}".format( |
|
",".join( |
|
[ |
|
"{}:{:.2f}".format(class_name, value) |
|
for class_name, value in zip(self.class_names, prediction[0]) |
|
] |
|
), |
|
self.class_names[output], |
|
string.pretty_duration( |
|
prediction_time, |
|
include_ms=True, |
|
short=True, |
|
), |
|
) |
|
) |
|
|
|
return True, output |
|
|
|
def render( |
|
self, |
|
predictions, |
|
test_labels, |
|
test_images, |
|
output_filename="", |
|
prediction_time=0, |
|
): |
|
num_rows = 4 |
|
num_cols = 6 |
|
num_images = num_rows * num_cols |
|
plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows)) |
|
for i in range(min(num_images, len(predictions))): |
|
plt.subplot(num_rows, 2 * num_cols, 2 * i + 1) |
|
plot_image(i, predictions[i], test_labels, test_images, self.class_names) |
|
plt.subplot(num_rows, 2 * num_cols, 2 * i + 2) |
|
plot_value_array(i, predictions[i], test_labels) |
|
plt.tight_layout() |
|
|
|
if output_filename: |
|
filename_ = file.auxiliary("prediction", "png") |
|
plt.savefig(filename_) |
|
plt.close() |
|
|
|
success, image = file.load_image(filename_) |
|
if success: |
|
image = graphics.add_signature( |
|
image, |
|
[" | ".join(host.signature()), " | ".join(objects.signature())], |
|
self.signature(prediction_time), |
|
) |
|
file.save_image(output_filename, image) |
|
|
|
def save(self, model_path): |
|
model_filename = f"{model_path}/image_classifier/model" |
|
|
|
if not file.prepare_for_saving(model_filename): |
|
return False |
|
|
|
try: |
|
self.model.save(model_filename) |
|
logger.info(f"image_classifier.model -> {model_filename}") |
|
except: |
|
crash_report(f"-image_classifier: save({model_path}): failed.") |
|
return False |
|
|
|
self.params["object_name"] = path.name(model_path) |
|
|
|
self.params["model_size"] = file.size(f"{model_path}/image_classifier/model") |
|
|
|
if not file.save_json( |
|
f"{model_path}/image_classifier/model/class_names.json", |
|
self.class_names, |
|
): |
|
return False |
|
|
|
if not file.save_json( |
|
f"{model_path}/image_classifier/model/params.json", |
|
self.params, |
|
): |
|
return False |
|
|
|
return True |
|
|
|
def signature(self, prediction_time): |
|
return [ |
|
" | ".join( |
|
[ |
|
"image_classifier", |
|
self.params["object_name"], |
|
string.pretty_bytes(self.params["model_size"]) |
|
if self.params["model_size"] |
|
else "", |
|
string.pretty_shape(self.input_shape), |
|
"/".join(string.shorten(self.class_names)), |
|
"took {} / frame".format( |
|
string.pretty_duration( |
|
prediction_time, |
|
include_ms=True, |
|
largest=True, |
|
short=True, |
|
) |
|
), |
|
] |
|
) |
|
] |
|
|
|
@staticmethod |
|
def train(data_path, model_path, color=False, convnet=True, epochs=10): |
|
classifier = Image_Classifier() |
|
classifier.params["convnet"] = convnet |
|
|
|
logger.info( |
|
"image_classifier.train({}) -{}> {}".format( |
|
data_path, |
|
"convnet-" if classifier.params["convnet"] else "", |
|
model_path, |
|
) |
|
) |
|
|
|
success, train_images = file.load(f"{data_path}/train_images.pyndarray") |
|
if success: |
|
success, train_labels = file.load(f"{data_path}/train_labels.pyndarray") |
|
if success: |
|
success, test_images = file.load(f"{data_path}/test_images.pyndarray") |
|
if success: |
|
success, test_labels = file.load(f"{data_path}/test_labels.pyndarray") |
|
if success: |
|
success, classifier.class_names = file.load_json( |
|
f"{data_path}/class_names.json" |
|
) |
|
if not success: |
|
return False |
|
|
|
from tensorflow.keras.utils import to_categorical |
|
|
|
train_labels = to_categorical(train_labels) |
|
test_labels = to_categorical(test_labels) |
|
|
|
window_size = train_images.shape[1] |
|
input_shape = ( |
|
(window_size, window_size, 3) |
|
if color |
|
else (window_size, window_size, 1) |
|
if convnet |
|
else (window_size, window_size) |
|
) |
|
logger.info(f"input:{string.pretty_shape(input_shape)}") |
|
|
|
if convnet and not color: |
|
train_images = np.expand_dims(train_images, axis=3) |
|
test_images = np.expand_dims(test_images, axis=3) |
|
|
|
for name, thing in zip( |
|
"train_images,train_labels,test_images,test_labels".split(","), |
|
[train_images, train_labels, test_images, test_labels], |
|
): |
|
logger.info("{}: {}".format(name, string.pretty_shape_of_matrix(thing))) |
|
logger.info( |
|
f"{len(classifier.class_names)} class(es): {', '.join(classifier.class_names)}" |
|
) |
|
|
|
train_images = train_images / 255.0 |
|
test_images = test_images / 255.0 |
|
|
|
if convnet: |
|
|
|
classifier.model = tf.keras.Sequential( |
|
[ |
|
tf.keras.layers.Conv2D( |
|
filters=48, |
|
kernel_size=3, |
|
activation="relu", |
|
input_shape=input_shape, |
|
), |
|
tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
|
tf.keras.layers.Conv2D( |
|
filters=48, kernel_size=3, activation="relu" |
|
), |
|
tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
|
tf.keras.layers.Conv2D( |
|
filters=32, kernel_size=3, activation="relu" |
|
), |
|
tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
|
tf.keras.layers.Flatten(), |
|
tf.keras.layers.Dense(128, activation="relu"), |
|
tf.keras.layers.Dense(64, activation="relu"), |
|
tf.keras.layers.Dense(len(classifier.class_names)), |
|
tf.keras.layers.Activation("softmax"), |
|
] |
|
) |
|
else: |
|
|
|
classifier.model = tf.keras.Sequential( |
|
[ |
|
tf.keras.layers.Flatten(input_shape=input_shape), |
|
tf.keras.layers.Dense(128, activation="relu"), |
|
tf.keras.layers.Dense(len(classifier.class_names)), |
|
tf.keras.layers.Activation("softmax"), |
|
] |
|
) |
|
|
|
classifier.model.summary() |
|
|
|
classifier.model.compile( |
|
optimizer="adam", |
|
loss=tf.keras.losses.categorical_crossentropy, |
|
metrics=["accuracy"], |
|
) |
|
|
|
classifier.model.fit(train_images, train_labels, epochs=epochs) |
|
|
|
test_accuracy = float( |
|
classifier.model.evaluate(test_images, test_labels, verbose=2)[1] |
|
) |
|
logger.info("test accuracy: {:.4f}".format(test_accuracy)) |
|
|
|
if not file.save_json( |
|
f"{model_path}/eval.json", |
|
{"metrics": {"test_accuracy": test_accuracy}}, |
|
): |
|
return False |
|
|
|
if not classifier.save(model_path): |
|
return False |
|
|
|
return classifier.predict( |
|
test_images, |
|
np.argmax(test_labels, axis=1), |
|
model_path, |
|
page_count=10, |
|
) |
|
|
|
@property |
|
def input_shape(self): |
|
return self.model.layers[0].input_shape[1:] if self.model.layers else [] |
|
|