import json | |
from typing import Any, Dict, List | |
import tensorflow as tf | |
import base64 | |
import io | |
import os | |
import numpy as np | |
from PIL import Image | |
# most of this code has been obtained from Datature's prediction script | |
# https://github.com/datature/resources/blob/main/scripts/bounding_box/prediction.py | |
class PreTrainedPipeline(): | |
def __init__(self, path: str): | |
# load the model | |
self.model = tf.saved_model.load(os.path.join(path, "saved_model")) | |
def __call__(self, inputs: "Image.Image")-> List[Dict[str, Any]]: | |
# convert img to numpy array, resize and normalize to make the prediction | |
img = np.array(inputs) | |
im = tf.image.resize(img, (128, 128)) | |
im = tf.cast(im, tf.float32) / 255.0 | |
pred_mask = self.model.predict(im[tf.newaxis, ...]) | |
# take the best performing class for each pixel | |
# the output of argmax looks like this [[1, 2, 0], ...] | |
pred_mask_arg = tf.argmax(pred_mask, axis=-1) | |
labels = [] | |
# convert the prediction mask into binary masks for each class | |
binary_masks = {} | |
mask_codes = {} | |
# when we take tf.argmax() over pred_mask, it becomes a tensor object | |
# the shape becomes TensorShape object, looking like this TensorShape([128]) | |
# we need to take get shape, convert to list and take the best one | |
rows = pred_mask_arg[0][1].get_shape().as_list()[0] | |
cols = pred_mask_arg[0][2].get_shape().as_list()[0] | |
for cls in range(pred_mask.shape[-1]): | |
binary_masks[f"mask_{cls}"] = np.zeros(shape = (pred_mask.shape[1], pred_mask.shape[2])) #create masks for each class | |
for row in range(rows): | |
for col in range(cols): | |
if pred_mask_arg[0][row][col] == cls: | |
binary_masks[f"mask_{cls}"][row][col] = 1 | |
else: | |
binary_masks[f"mask_{cls}"][row][col] = 0 | |
mask = binary_masks[f"mask_{cls}"] | |
mask *= 255 | |
img = Image.fromarray(mask.astype(np.int8), mode="L") | |
# we need to make it readable for the widget | |
with io.BytesIO() as out: | |
img.save(out, format="PNG") | |
png_string = out.getvalue() | |
mask = base64.b64encode(png_string).decode("utf-8") | |
mask_codes[f"mask_{cls}"] = mask | |
# widget needs the below format, for each class we return label and mask string | |
labels.append({ | |
"label": f"LABEL_{cls}", | |
"mask": mask_codes[f"mask_{cls}"], | |
"score": 1.0, | |
}) | |
labels = [{"score":0.9509243965148926,"label":"car","box":{"xmin":142,"ymin":106,"xmax":376,"ymax":229}}, | |
{"score":0.9981777667999268,"label":"car","box":{"xmin":405,"ymin":146,"xmax":640,"ymax":297}}, | |
{"score":0.9963648915290833,"label":"car","box":{"xmin":0,"ymin":115,"xmax":61,"ymax":167}}, | |
{"score":0.974663257598877,"label":"car","box":{"xmin":155,"ymin":104,"xmax":290,"ymax":141}}, | |
{"score":0.9986898303031921,"label":"car","box":{"xmin":39,"ymin":117,"xmax":169,"ymax":188}}, | |
{"score":0.9998276233673096,"label":"person","box":{"xmin":172,"ymin":60,"xmax":482,"ymax":396}}, | |
{"score":0.9996274709701538,"label":"skateboard","box":{"xmin":265,"ymin":348,"xmax":440,"ymax":413}}] | |
return labels | |
# class PreTrainedPipeline(): | |
# def __init__(self, path: str): | |
# # load the model | |
# self.model = tf.saved_model.load('./saved_model') | |
# def __call__(self, inputs: "Image.Image")-> List[Dict[str, Any]]: | |
# image = np.array(inputs) | |
# image = tf.cast(image, tf.float32) | |
# image = tf.image.resize(image, [150, 150]) | |
# image = np.expand_dims(image, axis = 0) | |
# predictions = self.model.predict(image) | |
# labels = [] | |
# labels = [{"score":0.9509243965148926,"label":"car","box":{"xmin":142,"ymin":106,"xmax":376,"ymax":229}},{"score":0.9981777667999268,"label":"car","box":{"xmin":405,"ymin":146,"xmax":640,"ymax":297}},{"score":0.9963648915290833,"label":"car","box":{"xmin":0,"ymin":115,"xmax":61,"ymax":167}},{"score":0.974663257598877,"label":"car","box":{"xmin":155,"ymin":104,"xmax":290,"ymax":141}},{"score":0.9986898303031921,"label":"car","box":{"xmin":39,"ymin":117,"xmax":169,"ymax":188}},{"score":0.9998276233673096,"label":"person","box":{"xmin":172,"ymin":60,"xmax":482,"ymax":396}},{"score":0.9996274709701538,"label":"skateboard","box":{"xmin":265,"ymin":348,"xmax":440,"ymax":413}}] | |
# return labels | |
# # ----------------- | |
# def load_model(): | |
# return tf.saved_model.load('./saved_model') | |
# def load_label_map(label_map_path): | |
# """ | |
# Reads label map in the format of .pbtxt and parse into dictionary | |
# Args: | |
# label_map_path: the file path to the label_map | |
# Returns: | |
# dictionary with the format of {label_index: {'id': label_index, 'name': label_name}} | |
# """ | |
# label_map = {} | |
# with open(label_map_path, "r") as label_file: | |
# for line in label_file: | |
# if "id" in line: | |
# label_index = int(line.split(":")[-1]) | |
# label_name = next(label_file).split(":")[-1].strip().strip('"') | |
# label_map[label_index] = {"id": label_index, "name": label_name} | |
# return label_map | |
# def predict_class(image, model): | |
# image = tf.cast(image, tf.float32) | |
# image = tf.image.resize(image, [150, 150]) | |
# image = np.expand_dims(image, axis = 0) | |
# return model.predict(image) | |
# def plot_boxes_on_img(color_map, classes, bboxes, image_origi, origi_shape): | |
# for idx, each_bbox in enumerate(bboxes): | |
# color = color_map[classes[idx]] | |
# ## Draw bounding box | |
# cv2.rectangle( | |
# image_origi, | |
# (int(each_bbox[1] * origi_shape[1]), | |
# int(each_bbox[0] * origi_shape[0]),), | |
# (int(each_bbox[3] * origi_shape[1]), | |
# int(each_bbox[2] * origi_shape[0]),), | |
# color, | |
# 2, | |
# ) | |
# ## Draw label background | |
# cv2.rectangle( | |
# image_origi, | |
# (int(each_bbox[1] * origi_shape[1]), | |
# int(each_bbox[2] * origi_shape[0]),), | |
# (int(each_bbox[3] * origi_shape[1]), | |
# int(each_bbox[2] * origi_shape[0] + 15),), | |
# color, | |
# -1, | |
# ) | |
# ## Insert label class & score | |
# cv2.putText( | |
# image_origi, | |
# "Class: {}, Score: {}".format( | |
# str(category_index[classes[idx]]["name"]), | |
# str(round(scores[idx], 2)), | |
# ), | |
# (int(each_bbox[1] * origi_shape[1]), | |
# int(each_bbox[2] * origi_shape[0] + 10),), | |
# cv2.FONT_HERSHEY_SIMPLEX, | |
# 0.3, | |
# (0, 0, 0), | |
# 1, | |
# cv2.LINE_AA, | |
# ) | |
# return image_origi | |
# # Webpage code starts here | |
# #TODO change this | |
# st.title('Distribution Grid - Belgium - Equipment detection') | |
# st.text('made by LabelFlow') | |
# st.markdown('## Description about your project') | |
# with st.spinner('Model is being loaded...'): | |
# model = load_model() | |
# # ask user to upload an image | |
# file = st.file_uploader("Upload image", type=["jpg", "png"]) | |
# if file is None: | |
# st.text('Waiting for upload...') | |
# else: | |
# st.text('Running inference...') | |
# # open image | |
# test_image = Image.open(file).convert("RGB") | |
# origi_shape = np.asarray(test_image).shape | |
# # resize image to default shape | |
# default_shape = 320 | |
# image_resized = np.array(test_image.resize((default_shape, default_shape))) | |
# ## Load color map | |
# category_index = load_label_map("./label_map.pbtxt") | |
# # TODO Add more colors if there are more classes | |
# # color of each label. check label_map.pbtxt to check the index for each class | |
# color_map = { | |
# 1: [69, 109, 42], | |
# 2: [107, 46, 186], | |
# 3: [9, 35, 183], | |
# 4: [27, 1, 30], | |
# 5: [0, 0, 0], | |
# 6: [5, 6, 7], | |
# 7: [11, 5, 12], | |
# 8: [209, 205, 211], | |
# 9: [17, 17, 17], | |
# 10: [101, 242, 50], | |
# 11: [51, 204, 170], | |
# 12: [106, 0, 132], | |
# 13: [7, 111, 153], | |
# 14: [8, 10, 9], | |
# 15: [234, 250, 252], | |
# 16: [58, 68, 30], | |
# 17: [24, 178, 117], | |
# 18: [21, 22, 21], | |
# 19: [53, 104, 83], | |
# 20: [12, 5, 10], | |
# 21: [223, 192, 249], | |
# 22: [234, 234, 234], | |
# 23: [119, 68, 221], | |
# 24: [224, 174, 94], | |
# 25: [140, 74, 116], | |
# 26: [90, 102, 1], | |
# 27: [216, 143, 208] | |
# } | |
# ## The model input needs to be a tensor | |
# input_tensor = tf.convert_to_tensor(image_resized) | |
# ## The model expects a batch of images, so add an axis with `tf.newaxis`. | |
# input_tensor = input_tensor[tf.newaxis, ...] | |
# ## Feed image into model and obtain output | |
# detections_output = model(input_tensor) | |
# num_detections = int(detections_output.pop("num_detections")) | |
# detections = {key: value[0, :num_detections].numpy() for key, value in detections_output.items()} | |
# detections["num_detections"] = num_detections | |
# ## Filter out predictions below threshold | |
# # if threshold is higher, there will be fewer predictions | |
# # TODO change this number to see how the predictions change | |
# confidence_threshold = 0.6 | |
# indexes = np.where(detections["detection_scores"] > confidence_threshold) | |
# ## Extract predicted bounding boxes | |
# bboxes = detections["detection_boxes"][indexes] | |
# # there are no predicted boxes | |
# if len(bboxes) == 0: | |
# st.error('No boxes predicted') | |
# # there are predicted boxes | |
# else: | |
# st.success('Boxes predicted') | |
# classes = detections["detection_classes"][indexes].astype(np.int64) | |
# scores = detections["detection_scores"][indexes] | |
# # plot boxes and labels on image | |
# image_origi = np.array(Image.fromarray(image_resized).resize((origi_shape[1], origi_shape[0]))) | |
# image_origi = plot_boxes_on_img(color_map, classes, bboxes, image_origi, origi_shape) | |
# # show image in web page | |
# st.image(Image.fromarray(image_origi), caption="Image with predictions", width=400) | |
# st.markdown("### Predicted boxes") | |
# for idx in range(len((bboxes))): | |
# st.markdown(f"* Class: {str(category_index[classes[idx]]['name'])}, confidence score: {str(round(scores[idx], 2))}") | |