import json from typing import Any, Dict, List import tensorflow as tf import base64 import io import os import numpy as np from PIL import Image # most of this code has been obtained from Datature's prediction script # https://github.com/datature/resources/blob/main/scripts/bounding_box/prediction.py class PreTrainedPipeline(): def __init__(self, path: str): # load the model self.model = tf.saved_model.load(os.path.join(path, "saved_model")) def __call__(self, inputs: "Image.Image")-> List[Dict[str, Any]]: # convert img to numpy array, resize and normalize to make the prediction img = np.array(inputs) im = tf.image.resize(img, (128, 128)) im = tf.cast(im, tf.float32) / 255.0 pred_mask = self.model.predict(im[tf.newaxis, ...]) # take the best performing class for each pixel # the output of argmax looks like this [[1, 2, 0], ...] pred_mask_arg = tf.argmax(pred_mask, axis=-1) labels = [] # convert the prediction mask into binary masks for each class binary_masks = {} mask_codes = {} # when we take tf.argmax() over pred_mask, it becomes a tensor object # the shape becomes TensorShape object, looking like this TensorShape([128]) # we need to take get shape, convert to list and take the best one rows = pred_mask_arg[0][1].get_shape().as_list()[0] cols = pred_mask_arg[0][2].get_shape().as_list()[0] for cls in range(pred_mask.shape[-1]): binary_masks[f"mask_{cls}"] = np.zeros(shape = (pred_mask.shape[1], pred_mask.shape[2])) #create masks for each class for row in range(rows): for col in range(cols): if pred_mask_arg[0][row][col] == cls: binary_masks[f"mask_{cls}"][row][col] = 1 else: binary_masks[f"mask_{cls}"][row][col] = 0 mask = binary_masks[f"mask_{cls}"] mask *= 255 img = Image.fromarray(mask.astype(np.int8), mode="L") # we need to make it readable for the widget with io.BytesIO() as out: img.save(out, format="PNG") png_string = out.getvalue() mask = base64.b64encode(png_string).decode("utf-8") mask_codes[f"mask_{cls}"] = mask # widget needs the below format, for each class we return label and mask string labels.append({ "label": f"LABEL_{cls}", "mask": mask_codes[f"mask_{cls}"], "score": 1.0, }) labels = [{"score":0.9509243965148926,"label":"car","box":{"xmin":142,"ymin":106,"xmax":376,"ymax":229}}, {"score":0.9981777667999268,"label":"car","box":{"xmin":405,"ymin":146,"xmax":640,"ymax":297}}, {"score":0.9963648915290833,"label":"car","box":{"xmin":0,"ymin":115,"xmax":61,"ymax":167}}, {"score":0.974663257598877,"label":"car","box":{"xmin":155,"ymin":104,"xmax":290,"ymax":141}}, {"score":0.9986898303031921,"label":"car","box":{"xmin":39,"ymin":117,"xmax":169,"ymax":188}}, {"score":0.9998276233673096,"label":"person","box":{"xmin":172,"ymin":60,"xmax":482,"ymax":396}}, {"score":0.9996274709701538,"label":"skateboard","box":{"xmin":265,"ymin":348,"xmax":440,"ymax":413}}] return labels # class PreTrainedPipeline(): # def __init__(self, path: str): # # load the model # self.model = tf.saved_model.load('./saved_model') # def __call__(self, inputs: "Image.Image")-> List[Dict[str, Any]]: # image = np.array(inputs) # image = tf.cast(image, tf.float32) # image = tf.image.resize(image, [150, 150]) # image = np.expand_dims(image, axis = 0) # predictions = self.model.predict(image) # labels = [] # labels = [{"score":0.9509243965148926,"label":"car","box":{"xmin":142,"ymin":106,"xmax":376,"ymax":229}},{"score":0.9981777667999268,"label":"car","box":{"xmin":405,"ymin":146,"xmax":640,"ymax":297}},{"score":0.9963648915290833,"label":"car","box":{"xmin":0,"ymin":115,"xmax":61,"ymax":167}},{"score":0.974663257598877,"label":"car","box":{"xmin":155,"ymin":104,"xmax":290,"ymax":141}},{"score":0.9986898303031921,"label":"car","box":{"xmin":39,"ymin":117,"xmax":169,"ymax":188}},{"score":0.9998276233673096,"label":"person","box":{"xmin":172,"ymin":60,"xmax":482,"ymax":396}},{"score":0.9996274709701538,"label":"skateboard","box":{"xmin":265,"ymin":348,"xmax":440,"ymax":413}}] # return labels # # ----------------- # def load_model(): # return tf.saved_model.load('./saved_model') # def load_label_map(label_map_path): # """ # Reads label map in the format of .pbtxt and parse into dictionary # Args: # label_map_path: the file path to the label_map # Returns: # dictionary with the format of {label_index: {'id': label_index, 'name': label_name}} # """ # label_map = {} # with open(label_map_path, "r") as label_file: # for line in label_file: # if "id" in line: # label_index = int(line.split(":")[-1]) # label_name = next(label_file).split(":")[-1].strip().strip('"') # label_map[label_index] = {"id": label_index, "name": label_name} # return label_map # def predict_class(image, model): # image = tf.cast(image, tf.float32) # image = tf.image.resize(image, [150, 150]) # image = np.expand_dims(image, axis = 0) # return model.predict(image) # def plot_boxes_on_img(color_map, classes, bboxes, image_origi, origi_shape): # for idx, each_bbox in enumerate(bboxes): # color = color_map[classes[idx]] # ## Draw bounding box # cv2.rectangle( # image_origi, # (int(each_bbox[1] * origi_shape[1]), # int(each_bbox[0] * origi_shape[0]),), # (int(each_bbox[3] * origi_shape[1]), # int(each_bbox[2] * origi_shape[0]),), # color, # 2, # ) # ## Draw label background # cv2.rectangle( # image_origi, # (int(each_bbox[1] * origi_shape[1]), # int(each_bbox[2] * origi_shape[0]),), # (int(each_bbox[3] * origi_shape[1]), # int(each_bbox[2] * origi_shape[0] + 15),), # color, # -1, # ) # ## Insert label class & score # cv2.putText( # image_origi, # "Class: {}, Score: {}".format( # str(category_index[classes[idx]]["name"]), # str(round(scores[idx], 2)), # ), # (int(each_bbox[1] * origi_shape[1]), # int(each_bbox[2] * origi_shape[0] + 10),), # cv2.FONT_HERSHEY_SIMPLEX, # 0.3, # (0, 0, 0), # 1, # cv2.LINE_AA, # ) # return image_origi # # Webpage code starts here # #TODO change this # st.title('Distribution Grid - Belgium - Equipment detection') # st.text('made by LabelFlow') # st.markdown('## Description about your project') # with st.spinner('Model is being loaded...'): # model = load_model() # # ask user to upload an image # file = st.file_uploader("Upload image", type=["jpg", "png"]) # if file is None: # st.text('Waiting for upload...') # else: # st.text('Running inference...') # # open image # test_image = Image.open(file).convert("RGB") # origi_shape = np.asarray(test_image).shape # # resize image to default shape # default_shape = 320 # image_resized = np.array(test_image.resize((default_shape, default_shape))) # ## Load color map # category_index = load_label_map("./label_map.pbtxt") # # TODO Add more colors if there are more classes # # color of each label. check label_map.pbtxt to check the index for each class # color_map = { # 1: [69, 109, 42], # 2: [107, 46, 186], # 3: [9, 35, 183], # 4: [27, 1, 30], # 5: [0, 0, 0], # 6: [5, 6, 7], # 7: [11, 5, 12], # 8: [209, 205, 211], # 9: [17, 17, 17], # 10: [101, 242, 50], # 11: [51, 204, 170], # 12: [106, 0, 132], # 13: [7, 111, 153], # 14: [8, 10, 9], # 15: [234, 250, 252], # 16: [58, 68, 30], # 17: [24, 178, 117], # 18: [21, 22, 21], # 19: [53, 104, 83], # 20: [12, 5, 10], # 21: [223, 192, 249], # 22: [234, 234, 234], # 23: [119, 68, 221], # 24: [224, 174, 94], # 25: [140, 74, 116], # 26: [90, 102, 1], # 27: [216, 143, 208] # } # ## The model input needs to be a tensor # input_tensor = tf.convert_to_tensor(image_resized) # ## The model expects a batch of images, so add an axis with `tf.newaxis`. # input_tensor = input_tensor[tf.newaxis, ...] # ## Feed image into model and obtain output # detections_output = model(input_tensor) # num_detections = int(detections_output.pop("num_detections")) # detections = {key: value[0, :num_detections].numpy() for key, value in detections_output.items()} # detections["num_detections"] = num_detections # ## Filter out predictions below threshold # # if threshold is higher, there will be fewer predictions # # TODO change this number to see how the predictions change # confidence_threshold = 0.6 # indexes = np.where(detections["detection_scores"] > confidence_threshold) # ## Extract predicted bounding boxes # bboxes = detections["detection_boxes"][indexes] # # there are no predicted boxes # if len(bboxes) == 0: # st.error('No boxes predicted') # # there are predicted boxes # else: # st.success('Boxes predicted') # classes = detections["detection_classes"][indexes].astype(np.int64) # scores = detections["detection_scores"][indexes] # # plot boxes and labels on image # image_origi = np.array(Image.fromarray(image_resized).resize((origi_shape[1], origi_shape[0]))) # image_origi = plot_boxes_on_img(color_map, classes, bboxes, image_origi, origi_shape) # # show image in web page # st.image(Image.fromarray(image_origi), caption="Image with predictions", width=400) # st.markdown("### Predicted boxes") # for idx in range(len((bboxes))): # st.markdown(f"* Class: {str(category_index[classes[idx]]['name'])}, confidence score: {str(round(scores[idx], 2))}")