# This app explores integrating `nrtk` with `Gradio` to create an interactive interface for applying perturbations. import numpy as np import yaml from pathlib import Path from nrtk.impls.perturb_image.pybsm.scenario import PybsmScenario from nrtk.impls.perturb_image.pybsm.sensor import PybsmSensor from nrtk.impls.perturb_image.pybsm.perturber import PybsmPerturber import gradio as gr # type: ignore asset_dir = Path(__file__).parent / "assets" # Define default values for fields for initilization and button clicks # # Note, for this application we'll be focusing on the use case of UAS imagery, so we'll try to pick default # values that work with images similar to those in the VisDrone dataset. Perturbing images from other datasets or other # operational tasks may not be successful without modification to these values; defining broad defaults is extremely # difficult, if not impossible due to the physics-based nature of these perturbations. default_values = { "gsd": 0.105, "scenario": { "aircraftSpeed": 0, "altitude": 75, "backgroundReflectance": 0.07, "backgroundTemperature": 293., "cn2at1m": 0, "groundRange": 0, "haWindspeed": 21., "ihaze": 2, "name": "", "targetReflectance": 0.15, "targetTemperature": 295., }, "sensor": { "D": 0.004, "bitdepth": 11.9, "darkCurrent": 0., "dax": 0.0001, "day": 0.0001, "eta": 0.4, "f": 0.01429, "intTime": 0.03, "maxN": 96000, "maxWellFill": 0.005, "name": "", "optTransWavelengths": [3.8e-07, 7.0e-07], "opticsTransmission": [], "px": 2.0e-05, "qe": [0.05, 0.6, 0.75, 0.85, 0.85, 0.75, 0.5, 0.2, 0], "qewavelengths": [3.0e-7, 4.0e-7, 5.0e-7, 6.0e-7, 7.0e-7, 8.0e-7, 9.0e-7, 1.0e-6, 1.1e-6], "readNoise": 25., "sx": 0., "sy": 0. } } def load_config(config): """ Loads configuration from config dictionary to UI elements """ scenario_config = config["scenario"] default_scenario = default_values["scenario"] sensor_config = config["sensor"] default_sensor = default_values["sensor"] return { input_group: gr.Group(visible=True), img_gsd: config["gsd"] if "gsd" in config else default_values["gsd"], scenario_name: scenario_config["name"] if "name" in scenario_config else default_scenario["name"], ihaze: scenario_config["ihaze"] if "ihaze" in scenario_config else default_scenario["ihaze"], altitude_m: scenario_config["altitude"] if "altitude" in scenario_config else default_scenario["altitude"], ground_range_m: scenario_config["groundRange"] if "groundRange" in scenario_config else default_scenario["groundRange"], aircraft_speed_m_Per_s: scenario_config["aircraftSpeed"] if "aircraftSpeed" in scenario_config else default_scenario["aircraftSpeed"], target_reflectance: scenario_config["targetReflectance"] if "targetReflectance" in scenario_config else default_scenario["targetReflectance"], target_temperature_K: scenario_config["targetTemperature"] if "targetTemperature" in scenario_config else default_scenario["targetTemperature"], bkgd_reflectance: scenario_config["backgroundReflectance"] if "backgroundReflectance" in scenario_config else default_scenario["backgroundReflectance"], bkgd_temperature_K: scenario_config["backgroundTemperature"] if "backgroundTemperature" in scenario_config else default_scenario["backgroundTemperature"], ha_windspeed_m_Per_s: scenario_config["haWindspeed"] if "haWindspeed" in scenario_config else default_scenario["haWindspeed"], cn2at1m: scenario_config["cn2at1m"] if "cn2at1m" in scenario_config else default_scenario["cn2at1m"], sensor_name: sensor_config["name"] if "name" in sensor_config else default_sensor["name"], D_m: sensor_config["D"] if "D" in sensor_config else default_sensor["D"], f_m: sensor_config["f"] if "f" in sensor_config else default_sensor["f"], px_m: sensor_config["px"] if "px" in sensor_config else default_sensor["px"], # Convert lists to comma separated text optTransWavelengths_str: ", ".join(str(x) for x in sensor_config["optTransWavelengths"]) if "optTransWavelengths" in sensor_config and sensor_config["optTransWavelengths"] else "", opticsTransmission_str: ", ".join(str(x) for x in sensor_config["opticsTransmission"]) if "opticsTransmission" in sensor_config and sensor_config["opticsTransmission"] else "", eta: sensor_config["eta"] if "eta" in sensor_config else default_sensor["eta"], int_time_s: sensor_config["intTime"] if "intTime" in sensor_config else default_sensor["intTime"], dark_current: sensor_config["darkCurrent"] if "darkCurrent" in sensor_config else default_sensor["darkCurrent"], read_noise: sensor_config["readNoise"] if "readNoise" in sensor_config else default_sensor["readNoise"], max_N: sensor_config["maxN"] if "maxN" in sensor_config else default_sensor["maxN"], bit_depth: sensor_config["bitdepth"] if "bitdepth" in sensor_config else default_sensor["bitdepth"], max_well_fill: sensor_config["maxWellFill"] if "maxWellFill" in sensor_config else default_sensor["maxWellFill"], sx: sensor_config["sx"] if "sx" in sensor_config else default_sensor["sx"], sy: sensor_config["sy"] if "sy" in sensor_config else default_sensor["sy"], dax: sensor_config["dax"] if "dax" in sensor_config else default_sensor["dax"], day: sensor_config["day"] if "day" in sensor_config else default_sensor["day"], # Convert lists to comma separated text qe_str: ", ".join(str(x) for x in sensor_config["qe"]) if "qe" in sensor_config and sensor_config["qe"] else "", qewavelengths_str: ", ".join(str(x) for x in sensor_config["qewavelengths"]) if "qewavelengths" in sensor_config and sensor_config["qewavelengths"] else "" } def generate_config(data): """ Generate dictionary that can easily be transformed into sensor and scenario objects """ # Input validation if data[ihaze] not in PybsmScenario.ihaze_values: raise gr.Error("Invalid ihaze value!") if data[altitude_m] not in PybsmScenario.altitude_values: raise gr.Error("Invalid altitude value!") if data[ground_range_m] not in PybsmScenario.groundRange_values: raise gr.Error("Invalid ground range value!") scenario_config = { "name": data[scenario_name], "ihaze": data[ihaze], "altitude": data[altitude_m], "groundRange": data[ground_range_m], "aircraftSpeed": data[aircraft_speed_m_Per_s], "targetReflectance": data[target_reflectance], "targetTemperature": data[target_temperature_K], "backgroundReflectance": data[bkgd_reflectance], "backgroundTemperature": data[bkgd_temperature_K], "haWindspeed": data[ha_windspeed_m_Per_s], "cn2at1m": data[cn2at1m] } # Convert text fields into lists of floats optTransWavelengths = [float(w.strip()) for w in data[optTransWavelengths_str].split(",") if w.strip()] opticsTransmission = [float(t.strip()) for t in data[opticsTransmission_str].split(",") if t.strip()] opticsTransmission = None if not opticsTransmission else opticsTransmission qe = [float(q.strip()) for q in data[qe_str].split(",") if q.strip()] qe = None if not qe else qe qewavelengths = [float(q.strip()) for q in data[qewavelengths_str].split(",") if q.strip()] qewavelengths = None if not qewavelengths else qewavelengths # More input validation if len(optTransWavelengths) < 2: raise gr.Error("At least 2 optical transmission wavelengths required!") if optTransWavelengths[0] >= optTransWavelengths[-1]: raise gr.Error("Optical transmission wavelengths should be entered least to greatest!") if opticsTransmission is not None and len(opticsTransmission) != len(optTransWavelengths): raise gr.Error("If provided, Optical Transmission must have the same number of values as Spectral Bandpass!") sensor_config = { "name": data[sensor_name], "D": data[D_m], "f": data[f_m], "px": data[px_m], "optTransWavelengths": optTransWavelengths, "opticsTransmission": opticsTransmission, "eta": data[eta], "intTime": data[int_time_s], "darkCurrent": data[dark_current], "readNoise": data[read_noise], "maxN": data[max_N], "bitdepth": data[bit_depth], "maxWellFill": data[max_well_fill], "sx": data[sx], "sy": data[sy], "dax": data[dax], "day": data[day], "qe": qe, "qewavelengths": qewavelengths } return { "scenario": scenario_config, "sensor": sensor_config, "gsd" : data[img_gsd] } def gen_new_config(): """ Resets all fields to default values """ return load_config(default_values) def load_config_from_file(data): """ Loads configuration from given file to UI elements """ if not data[config_file]: raise gr.Error("A file must be uploaded to load existing configuration!") with open(data[config_file]) as file: config = yaml.safe_load(file) return load_config(config) def submit(data): """ Apply the perturbation and hide/show relevant UI elements as needed """ config = generate_config(data) scenario_config = config["scenario"] sensor_config = config["sensor"] # Sensor expects numpy arrays, but plain lists serialize better so convert here if sensor_config["optTransWavelengths"]: sensor_config["optTransWavelengths"] = np.asarray(sensor_config["optTransWavelengths"]) if sensor_config["opticsTransmission"]: sensor_config["opticsTransmission"] = np.asarray(sensor_config["opticsTransmission"]) if sensor_config["qe"]: sensor_config["qe"] = np.asarray(sensor_config["qe"]) if sensor_config["qewavelengths"]: sensor_config["qewavelengths"] = np.asarray(sensor_config["qewavelengths"]) gsd = config["gsd"] sensor = PybsmSensor(**sensor_config) scenario = PybsmScenario(**scenario_config) perturber = PybsmPerturber(sensor=sensor, scenario=scenario) config = generate_config(data) config_file = str(asset_dir / "generated_config.yml") with open(config_file, "w") as f: yaml.dump(config, f) def stretch_contrast_convert_8bit(img, perc=[0.1, 99.9]): img = img.astype(float) img = img - np.percentile(img.ravel(), perc[0]) img = img / (np.percentile(img.ravel(), perc[1]) / 255) img = np.clip(img, 0, 255) return np.round(img).astype(np.uint8) perturbed_img = perturber(image=data[input_img], additional_params={"img_gsd": gsd}) peturbed_img = stretch_contrast_convert_8bit(perturbed_img, [0.1, 90.9]) # Apply the perturbation and display return { out_img: perturbed_img, out_config: config_file } # Lastly, we define the layout of the application and register the button click listener functions: with gr.Blocks() as demo: gr.Markdown( """ # Apply pyBSM Perturbations with NRTK Note: Default configuration options are tailored to UAS imagery (specifically VisDrone). Perturbing other datasets or other operational tasks may not be successful without modification to some of these configuration options; defining broad defaults is extremely difficult, if not impossible due to the physics-based nature of these perturbations. """ ) with gr.Row(): with gr.Column(): sample_img_path = str(asset_dir / "0000161_01584_d_0000158.jpg") input_img = gr.Image( label="Input Image", value=sample_img_path ) gr.Examples( examples=[ sample_img_path, str(asset_dir / "0000006_01111_d_0000003.jpg"), str(asset_dir / "0000006_01659_d_0000004.jpg"), str(asset_dir / "0000006_04309_d_0000011.jpg") ], inputs=input_img ) with gr.Column() as output_col: out_img = gr.Image(label="Perturbed Image") out_config = gr.File(label="Generated Configuration") with gr.Row(): with gr.Column() as input_col: with gr.Row(): gen_config_btn = gr.Button("Generate New Configuration") with gr.Column(): config_file = gr.File(label="Configuration File", file_types=[".yaml"]) load_config_btn = gr.Button("Load Configuration from File") with gr.Group(visible=False) as input_group: with gr.Accordion("Image Parameters", open=False): img_gsd = gr.Number( label="Image Ground Sample Distance (GSD) (m)", info="The size of one pixel on the ground", value=default_values["gsd"] ) with gr.Accordion("Scenario Parameters") as scenario_params: altitude_m = gr.Number( label="Altitude (m)", info="Sensor height above ground level in meters. The database includes the following " \ "altitude options: 2m 32.55m 75m 150m 225m 500m, 1000m to 12000m in 1000m steps, " \ "14000m to 20000m in 2000m steps, and 24500m", value=default_values["scenario"]["altitude"] ) ground_range_m = gr.Number( label="Ground Range (m)", info="Distance on the ground between the target and sensor in meters. The following " \ "ground ranges are included in the database at each altitude until the ground " \ "range exceeds the distance to the spherical earth horizon: 0m 100m 500m, 1000m to " \ "20000m in 1000m steps, 22000m to 80000m in 2000m steps, and 85000m to " \ "300000m in 5000m steps.", value=default_values["scenario"]["groundRange"] ) with gr.Accordion("Additional Scenario Parameters", open=False) as opt_scenario_params: scenario_name = gr.Textbox(label="Scenario Name", value=default_values["scenario"]["name"]) ihaze = gr.Dropdown( label="IHAZE", info="MODTRAN code for visibility", choices=[1, 2], value=default_values["scenario"]["ihaze"] ) aircraft_speed_m_Per_s = gr.Number( label="Aircraft Speed (m/s)", info="Ground speed of the aircraft", value=default_values["scenario"]["aircraftSpeed"] ) with gr.Row(): target_reflectance = gr.Number( label="Target Reflectance", info="Object reflectance", value=default_values["scenario"]["targetReflectance"] ) target_temperature_K = gr.Number( label="Target Temperature (K)", info="Object temperature (Kelvin)", value=default_values["scenario"]["targetTemperature"] ) with gr.Row(): bkgd_reflectance = gr.Number( label="Background Reflectance", info="Background reflectance", value=default_values["scenario"]["backgroundReflectance"] ) bkgd_temperature_K = gr.Number( label="Background Temperature (K)", info="Background temperature (Kelvin)", value=default_values["scenario"]["backgroundTemperature"] ) ha_windspeed_m_Per_s = gr.Number( label="High Altitude Windspeed (m/s)", info="Used to calculate the turbulence profile", value=default_values["scenario"]["haWindspeed"] ) cn2at1m = gr.Number( label="Refractive Index Structure Parameter", info='The refractive index structure parameter "near the ground" (e.g. ' \ 'at h = 1m). Used to calculate the turbulence profile', value=default_values["scenario"]["cn2at1m"] ) with gr.Accordion("Sensor Parameters") as sensor_params: D_m = gr.Number(label="Effective Aperture Diameter (m)", value=default_values["sensor"]["D"]) f_m = gr.Number(label="Focal Length (m)", value=default_values["sensor"]["f"]) with gr.Accordion("Additional Sensor Parameters", open=False) as opt_sensor_parameters: sensor_name = gr.Textbox(label="Sensor Name", value=default_values["sensor"]["name"]) px_m = gr.Number(label="Detector Center-to-Center Spacing (Pitch) (m)", value=default_values["sensor"]["px"]) optTransWavelengths_str = gr.Textbox( label="Spectral Bandpass of the Camera (m)", info="Enter a comma separated list. At minimum, a start and end wavelength should be specified", value=", ".join(map(str, default_values["sensor"]["optTransWavelengths"])) if default_values["sensor"]["optTransWavelengths"] else "" ) opticsTransmission_str = gr.Textbox( label="Full System In-Band Optical Transmission", info="Enter a comma separated list. Loss due to any telescope obscuration should not be included", value=", ".join(map(str, default_values["sensor"]["opticsTransmission"])) if default_values["sensor"]["opticsTransmission"] else "" ) eta = gr.Number(label="Relative Linear Obscuration", value=default_values["sensor"]["eta"]) int_time_s = gr.Number( label="Integration Time (s)", info="Maximum integration time", value=default_values["sensor"]["intTime"] ) dark_current = gr.Number(label="Detector Dark Current (e-/s)", value=default_values["sensor"]["darkCurrent"]) read_noise = gr.Number(label="RMS Read Noise (RMS e-)", value=default_values["sensor"]["readNoise"]) max_N = gr.Number(label="Maximum ADC Level (e-)", value=default_values["sensor"]["maxN"]) bit_depth = gr.Number( label="Bit Depth (bits)", info="Resolution of the detector ADC", value=default_values["sensor"]["bitdepth"] ) max_well_fill = gr.Number( label="Max Well Fill", info="Desired well fill. i.e. maximum well size x desired fill fraction", value=default_values["sensor"]["maxWellFill"] ) with gr.Row(): sx = gr.Number(label="RMS Jitter Amplitude, X Direction (rad)", value=default_values["sensor"]["sx"]) sy = gr.Number(label="RMS Jitter Amplitude, Y Direction (rad)", value=default_values["sensor"]["sy"]) with gr.Row(): dax = gr.Number( label="Line of Sight Angular Drift Rate, X Direction (rad/s)", info="Drift rate during one integration time", value=default_values["sensor"]["dax"] ) day = gr.Number( label="Line of Sight Angular Drift Rate, Y Direction (rad/s)", info="Drift rate during one integration time", value=default_values["sensor"]["day"] ) qe_str = gr.Textbox( label="Quantum Efficiency as a function of Wavelength (e-/photon)", info="Enter a comma separated list", value=", ".join(map(str, default_values["sensor"]["qe"])) if default_values["sensor"]["qe"] else "" ) qewavelengths_str = gr.Textbox( label="Wavelengths Corresponding to the Quantum Efficiency Array (microns)", info="Enter a comma separated list", value=", ".join(map(str, default_values["sensor"]["qewavelengths"])) if default_values["sensor"]["qewavelengths"] else "" ) submit_btn = gr.Button("Perturb Image") github_btn = gr.Button( "Check out NRTK on GitHub!", icon=str(asset_dir / "github-badge.png"), link="https://github.com/Kitware/nrtk" ) # Button listeners gen_config_btn.click( fn=gen_new_config, inputs=None, outputs=[ input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str ] ) load_config_btn.click( fn=load_config_from_file, inputs={config_file}, outputs=[ input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str ] ) submit_btn.click( fn=submit, inputs={ input_img, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance, target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name, D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise, max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str }, outputs=[out_img, out_config], ) demo.launch(show_error=True)