samgis / src /app.py
aletrn's picture
[feat] add first working use of pydantic input validation
dcc25fd
raw
history blame
5.98 kB
import json
import time
from http import HTTPStatus
from typing import Dict, List
from aws_lambda_powertools.event_handler import content_types
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.parser import BaseModel
from src import app_logger
from src.io.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates
from src.prediction_api.predictors import samexporter_predict
from src.utilities.constants import CUSTOM_RESPONSE_MESSAGES, DEFAULT_LOG_LEVEL
from src.utilities.utilities import base64_decode
list_float = List[float]
llist_float = List[list_float]
class LatLngDict(BaseModel):
lat: float
lng: float
class RawBBox(BaseModel):
ne: LatLngDict
sw: LatLngDict
class RawPrompt(BaseModel):
type: str
data: LatLngDict
label: int = 0
class RawRequestInput(BaseModel):
bbox: RawBBox
prompt: RawPrompt
zoom: int | float
source_type: str = "Satellite"
class ParsedPrompt(BaseModel):
type: str
data: llist_float
label: int = 0
class ParsedRequestInput(BaseModel):
bbox: llist_float
prompt: ParsedPrompt
zoom: int | float
def get_response(status: int, start_time: float, request_id: str, response_body: Dict = None) -> str:
"""
Return a response for frontend clients.
Args:
status: status response
start_time: request start time (float)
request_id: str
response_body: dict we embed into our response
Returns:
str: json response
"""
app_logger.debug(f"response_body:{response_body}.")
response_body["duration_run"] = time.time() - start_time
response_body["message"] = CUSTOM_RESPONSE_MESSAGES[status]
response_body["request_id"] = request_id
response = {
"statusCode": status,
"header": {"Content-Type": content_types.APPLICATION_JSON},
"body": json.dumps(response_body),
"isBase64Encoded": False
}
app_logger.debug(f"response type:{type(response)} => {response}.")
return json.dumps(response)
def get_parsed_bbox_points(request_input: RawRequestInput) -> Dict:
app_logger.info(f"try to parsing input request {request_input}...")
bbox = request_input["bbox"]
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox["ne"]
sw = bbox["sw"]
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne["lat"]), float(ne["lng"])]
sw_latlng = [float(sw["lat"]), float(sw["lng"])]
bbox = [ne_latlng, sw_latlng]
zoom = int(request_input["zoom"])
for prompt in request_input["prompt"]:
app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.")
data = prompt["data"]
if prompt["type"] == "point":
current_point = get_latlng_to_pixel_coordinates(ne, sw, data, zoom, "point")
app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}.")
new_prompt_data = [current_point['x'], current_point['y']]
app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.")
prompt["data"] = new_prompt_data
else:
raise ValueError("valid prompt type is only 'point'")
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt updated => {request_input["prompt"]}.')
app_logger.info(f"unpacking elaborated {request_input}...")
return {
"bbox": bbox,
"prompt": request_input["prompt"],
"zoom": zoom
}
def lambda_handler(event: dict, context: LambdaContext):
app_logger.info(f"start with aws_request_id:{context.aws_request_id}.")
start_time = time.time()
if "version" in event:
app_logger.info(f"event version: {event['version']}.")
try:
body = get_parsed_request_body(context, event)
try:
prompt_latlng = body["prompt"]
app_logger.debug(f"prompt_latlng:{prompt_latlng}.")
body_request = get_parsed_bbox_points(body)
app_logger.info(f"body_request=> {type(body_request)}, {body_request}.")
body_response = samexporter_predict(body_request["bbox"], body_request["prompt"], body_request["zoom"])
app_logger.info(f"output body_response:{body_response}.")
response = get_response(HTTPStatus.OK.value, start_time, context.aws_request_id, body_response)
except Exception as ex2:
app_logger.error(f"exception2:{ex2}.")
response = get_response(HTTPStatus.UNPROCESSABLE_ENTITY.value, start_time, context.aws_request_id, {})
except Exception as ex1:
app_logger.error(f"exception1:{ex1}.")
response = get_response(HTTPStatus.INTERNAL_SERVER_ERROR.value, start_time, context.aws_request_id, {})
app_logger.info(f"response_dumped:{response}...")
return response
def get_parsed_request_body(context, event):
app_logger.info(f"event:{json.dumps(event)}...")
app_logger.info(f"context:{context}...")
try:
body = event["body"]
except Exception as e_constants1:
app_logger.error(f"e_constants1:{e_constants1}.")
body = event
app_logger.debug(f"body, #1: {type(body)}, {body}...")
if isinstance(body, str):
body_decoded_str = base64_decode(body)
app_logger.debug(f"body_decoded_str: {type(body_decoded_str)}, {body_decoded_str}...")
body = json.loads(body_decoded_str)
app_logger.info(f"body, #2: {type(body)}, {body}...")
try:
log_level = 'DEBUG' if body['debug'] else DEFAULT_LOG_LEVEL
app_logger.warning(f"set logger level to DEBUG")
app_logger.setLevel(log_level)
except KeyError:
app_logger.warning(f"can't set log level, reset it...")
app_logger.setLevel(DEFAULT_LOG_LEVEL)
app_logger.warning(f"logger level is {app_logger.log_level}.")
return body