|
import torch |
|
from PIL import Image |
|
from io import BytesIO |
|
from realesrgan import RealESRGANer |
|
from typing import Dict, List, Any |
|
import os |
|
from pathlib import Path |
|
from basicsr.archs.rrdbnet_arch import RRDBNet |
|
import numpy as np |
|
import cv2 |
|
import PIL |
|
|
|
|
|
import torch |
|
import base64 |
|
|
|
|
|
class EndpointHandler: |
|
def __init__(self, path=""): |
|
|
|
self.model = RealESRGANer( |
|
scale=4, |
|
|
|
model_path="/workspace/real-esrgan/weights/Real-ESRGAN-x4plus.pth", |
|
|
|
model= RRDBNet(num_in_ch=3, |
|
num_out_ch=3, |
|
num_feat=64, |
|
num_block=23, |
|
num_grow_ch=32, |
|
scale=4 |
|
), |
|
tile=0, |
|
tile_pad=10, |
|
|
|
half=True, |
|
|
|
) |
|
|
|
def __call__(self, data: Any) -> Dict[str, List[float]]: |
|
|
|
try: |
|
|
|
|
|
inputs = data.pop("inputs", data) |
|
|
|
|
|
outscale = float(inputs.pop("outscale", 3)) |
|
|
|
|
|
image = Image.open(BytesIO(base64.b64decode(inputs['image']))) |
|
in_size, in_mode = image.size, image.mode |
|
|
|
|
|
assert in_mode in ["RGB", "RGBA", "L"], f"Unsupported image mode: {in_mode}" |
|
assert in_size[0] * in_size[1] < 1400*1400, f"Image is too large: {in_size}: {in_size[0] * in_size[1]} is greater than {1400*1400}" |
|
assert outscale > 1 and outscale <=10, f"Outscale must be between 1 and 10: {outscale}" |
|
|
|
|
|
print(f"image.size: {in_size}, image.mode: {in_mode}, outscale: {outscale}") |
|
|
|
|
|
opencv_image = np.array(image) |
|
if in_mode == "RGB": |
|
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGB2BGR) |
|
elif in_mode == "RGBA": |
|
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGBA2BGRA) |
|
elif in_mode == "L": |
|
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_GRAY2RGB) |
|
else: |
|
raise ValueError(f"Unsupported image mode: {in_mode}") |
|
|
|
|
|
output, _ = self.model.enhance(opencv_image, outscale=outscale) |
|
|
|
|
|
print(f"output.shape: {output.shape}") |
|
|
|
|
|
out_shape = output.shape |
|
if len(out_shape) == 3: |
|
if out_shape[2] == 3: |
|
output = cv2.cvtColor(output, cv2.COLOR_BGR2RGB) |
|
elif out_shape[2] == 4: |
|
output = cv2.cvtColor(output, cv2.COLOR_BGRA2RGBA) |
|
else: |
|
output = cv2.cvtColor(output, cv2.COLOR_GRAY2RGB) |
|
|
|
|
|
img_byte_arr = BytesIO() |
|
output = Image.fromarray(output) |
|
|
|
|
|
output.save(img_byte_arr, format='PNG') |
|
img_str = base64.b64encode(img_byte_arr.getvalue()) |
|
img_str = img_str.decode() |
|
|
|
return {"out_image": img_str, |
|
"error": None |
|
} |
|
|
|
|
|
except AssertionError as e: |
|
print(f"AssertionError: {e}") |
|
return {"out_image": None, "error": str(e)} |
|
except KeyError as e: |
|
print(f"KeyError: {e}") |
|
return {"out_image": None, "error": f"Missing key: {e}"} |
|
except ValueError as e: |
|
print(f"ValueError: {e}") |
|
return {"out_image": None, "error": str(e)} |
|
except PIL.UnidentifiedImageError as e: |
|
print(f"PIL.UnidentifiedImageError: {e}") |
|
return {"out_image": None, "error": "Invalid image format"} |
|
except Exception as e: |
|
print(f"Exception: {e}") |
|
return {"out_image": None, "error": "An unexpected error occurred"} |
|
|