|
import json |
|
import time |
|
from http import HTTPStatus |
|
from typing import Dict, List |
|
|
|
from aws_lambda_powertools.event_handler import content_types |
|
from aws_lambda_powertools.utilities.typing import LambdaContext |
|
from aws_lambda_powertools.utilities.parser import BaseModel |
|
|
|
from src import app_logger |
|
from src.io.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates |
|
from src.prediction_api.predictors import samexporter_predict |
|
from src.utilities.constants import CUSTOM_RESPONSE_MESSAGES, DEFAULT_LOG_LEVEL |
|
from src.utilities.utilities import base64_decode |
|
|
|
|
|
list_float = List[float] |
|
llist_float = List[list_float] |
|
|
|
|
|
class LatLngDict(BaseModel): |
|
lat: float |
|
lng: float |
|
|
|
|
|
class RawBBox(BaseModel): |
|
ne: LatLngDict |
|
sw: LatLngDict |
|
|
|
|
|
class RawPrompt(BaseModel): |
|
type: str |
|
data: LatLngDict |
|
label: int = 0 |
|
|
|
|
|
class RawRequestInput(BaseModel): |
|
bbox: RawBBox |
|
prompt: RawPrompt |
|
zoom: int | float |
|
source_type: str = "Satellite" |
|
|
|
|
|
class ParsedPrompt(BaseModel): |
|
type: str |
|
data: llist_float |
|
label: int = 0 |
|
|
|
|
|
class ParsedRequestInput(BaseModel): |
|
bbox: llist_float |
|
prompt: ParsedPrompt |
|
zoom: int | float |
|
|
|
|
|
def get_response(status: int, start_time: float, request_id: str, response_body: Dict = None) -> str: |
|
""" |
|
Return a response for frontend clients. |
|
|
|
Args: |
|
status: status response |
|
start_time: request start time (float) |
|
request_id: str |
|
response_body: dict we embed into our response |
|
|
|
Returns: |
|
str: json response |
|
|
|
""" |
|
app_logger.debug(f"response_body:{response_body}.") |
|
response_body["duration_run"] = time.time() - start_time |
|
response_body["message"] = CUSTOM_RESPONSE_MESSAGES[status] |
|
response_body["request_id"] = request_id |
|
|
|
response = { |
|
"statusCode": status, |
|
"header": {"Content-Type": content_types.APPLICATION_JSON}, |
|
"body": json.dumps(response_body), |
|
"isBase64Encoded": False |
|
} |
|
app_logger.debug(f"response type:{type(response)} => {response}.") |
|
return json.dumps(response) |
|
|
|
|
|
def get_parsed_bbox_points(request_input: RawRequestInput) -> Dict: |
|
app_logger.info(f"try to parsing input request {request_input}...") |
|
bbox = request_input["bbox"] |
|
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.") |
|
ne = bbox["ne"] |
|
sw = bbox["sw"] |
|
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.") |
|
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.") |
|
ne_latlng = [float(ne["lat"]), float(ne["lng"])] |
|
sw_latlng = [float(sw["lat"]), float(sw["lng"])] |
|
bbox = [ne_latlng, sw_latlng] |
|
zoom = int(request_input["zoom"]) |
|
for prompt in request_input["prompt"]: |
|
app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.") |
|
data = prompt["data"] |
|
if prompt["type"] == "point": |
|
current_point = get_latlng_to_pixel_coordinates(ne, sw, data, zoom, "point") |
|
app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}.") |
|
new_prompt_data = [current_point['x'], current_point['y']] |
|
app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.") |
|
prompt["data"] = new_prompt_data |
|
else: |
|
raise ValueError("valid prompt type is only 'point'") |
|
|
|
app_logger.debug(f"bbox => {bbox}.") |
|
app_logger.debug(f'request_input-prompt updated => {request_input["prompt"]}.') |
|
|
|
app_logger.info(f"unpacking elaborated {request_input}...") |
|
return { |
|
"bbox": bbox, |
|
"prompt": request_input["prompt"], |
|
"zoom": zoom |
|
} |
|
|
|
|
|
def lambda_handler(event: dict, context: LambdaContext): |
|
app_logger.info(f"start with aws_request_id:{context.aws_request_id}.") |
|
start_time = time.time() |
|
|
|
if "version" in event: |
|
app_logger.info(f"event version: {event['version']}.") |
|
|
|
try: |
|
body = get_parsed_request_body(context, event) |
|
|
|
try: |
|
prompt_latlng = body["prompt"] |
|
app_logger.debug(f"prompt_latlng:{prompt_latlng}.") |
|
body_request = get_parsed_bbox_points(body) |
|
app_logger.info(f"body_request=> {type(body_request)}, {body_request}.") |
|
body_response = samexporter_predict(body_request["bbox"], body_request["prompt"], body_request["zoom"]) |
|
app_logger.info(f"output body_response:{body_response}.") |
|
response = get_response(HTTPStatus.OK.value, start_time, context.aws_request_id, body_response) |
|
except Exception as ex2: |
|
app_logger.error(f"exception2:{ex2}.") |
|
response = get_response(HTTPStatus.UNPROCESSABLE_ENTITY.value, start_time, context.aws_request_id, {}) |
|
except Exception as ex1: |
|
app_logger.error(f"exception1:{ex1}.") |
|
response = get_response(HTTPStatus.INTERNAL_SERVER_ERROR.value, start_time, context.aws_request_id, {}) |
|
|
|
app_logger.info(f"response_dumped:{response}...") |
|
return response |
|
|
|
|
|
def get_parsed_request_body(context, event): |
|
app_logger.info(f"event:{json.dumps(event)}...") |
|
app_logger.info(f"context:{context}...") |
|
try: |
|
body = event["body"] |
|
except Exception as e_constants1: |
|
app_logger.error(f"e_constants1:{e_constants1}.") |
|
body = event |
|
app_logger.debug(f"body, #1: {type(body)}, {body}...") |
|
if isinstance(body, str): |
|
body_decoded_str = base64_decode(body) |
|
app_logger.debug(f"body_decoded_str: {type(body_decoded_str)}, {body_decoded_str}...") |
|
body = json.loads(body_decoded_str) |
|
app_logger.info(f"body, #2: {type(body)}, {body}...") |
|
try: |
|
log_level = 'DEBUG' if body['debug'] else DEFAULT_LOG_LEVEL |
|
app_logger.warning(f"set logger level to DEBUG") |
|
app_logger.setLevel(log_level) |
|
except KeyError: |
|
app_logger.warning(f"can't set log level, reset it...") |
|
app_logger.setLevel(DEFAULT_LOG_LEVEL) |
|
app_logger.warning(f"logger level is {app_logger.log_level}.") |
|
return body |
|
|