percept-map / app.py
mdanish's picture
disable clearing markers for now
8d5be8a
import streamlit as st
import cv2
from ultralytics import YOLO # For street detection
import folium
from streamlit_folium import st_folium
import requests
from PIL import Image
from io import BytesIO
import numpy as np
import torch
from sklearn.utils.extmath import softmax
import open_clip
import os
knnpath = '20241204-ams-no-env-open_clip_ViT-H-14-378-quickgelu.npz'
clip_model_name = 'ViT-H-14-378-quickgelu'
pretrained_name = 'dfn5b'
categories = ['walkability', 'bikeability', 'pleasantness', 'greenness', 'safety']
debug = False
# Set page config
st.set_page_config(
page_title="Percept",
layout="wide"
)
# Securely get the token from environment variables
MAPILLARY_ACCESS_TOKEN = os.environ.get('MAPILLARY_ACCESS_TOKEN')
# Verify token exists
if not MAPILLARY_ACCESS_TOKEN:
st.error("Mapillary access token not found. Please configure it in the Space secrets.")
st.stop()
def detect_and_crop_street(panorama_url, use_yolo=True):
"""
Detect streets in a panoramic image and return a cropped normal-sized image
Args:
panorama_url: URL of the panoramic image
use_yolo: Whether to use YOLOv8 (True) or simple edge detection (False)
Returns:
cropped_image: PIL Image containing the cropped street view
"""
# Download and convert image to CV2 format
response = requests.get(panorama_url)
img = Image.open(BytesIO(response.content))
cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
if use_yolo:
# Load YOLOv8 model
model = YOLO('yolov8n.pt')
# Detect objects
results = model(cv_img)
# Look for road/street class (index 0 in COCO dataset)
street_boxes = []
for result in results:
for box, cls in zip(result.boxes.xyxy, result.boxes.cls):
if cls == 0: # road class
street_boxes.append(box.cpu().numpy())
if street_boxes:
# Take the largest street detection
largest_box = max(street_boxes, key=lambda box: (box[2]-box[0])*(box[3]-box[1]))
x1, y1, x2, y2 = map(int, largest_box)
midx = (x2 - x1) / 2
# Add some padding
padding = 200
height, width = cv_img.shape[:2]
x1 = max(0, x1 - padding)
y1 = max(0, y1 - padding)
x2 = min(width, x2 + padding)
y2 = min(height, y2 + padding)
cropped = cv_img[y1:y2, x1:x2]
else:
# Fallback to edge detection if no streets found
cropped = edge_based_crop(cv_img)
else:
cropped = edge_based_crop(cv_img)
# Convert back to PIL Image
cropped_pil = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
# Resize to standard dimensions while maintaining aspect ratio
target_width = 1024
aspect_ratio = cropped.shape[1] / cropped.shape[0]
target_height = int(target_width / aspect_ratio)
cropped_pil = cropped_pil.resize((target_width, target_height), Image.Resampling.LANCZOS)
return cropped_pil
def edge_based_crop(cv_img):
"""
Use edge detection to find and crop around street areas
"""
# Convert to grayscale
gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY)
# Apply Gaussian blur
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# Detect edges
edges = cv2.Canny(blurred, 50, 150)
# Find contours
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# Find the largest contour
largest_contour = max(contours, key=cv2.contourArea)
x, y, w, h = cv2.boundingRect(largest_contour)
# Add padding
padding = 200
height, width = cv_img.shape[:2]
x = max(0, x - padding)
y = max(0, y - padding)
w = min(width - x, w + 2*padding)
h = min(height - y, h + 2*padding)
return cv_img[y:y+h, x:x+w]
else:
# If no contours found, return center crop
height, width = cv_img.shape[:2]
center_x = width // 2
center_y = height // 2
crop_width = width // 3
crop_height = height // 3
return cv_img[center_y-crop_height//2:center_y+crop_height//2,
center_x-crop_width//2:center_x+crop_width//2]
# Example usage in your Streamlit app:
def process_panorama(panorama_url):
"""
Process a panoramic image to get a street-centered crop
"""
try:
cropped_image = detect_and_crop_street(panorama_url)
return cropped_image
except Exception as e:
st.error(f"Error processing panorama: {str(e)}")
return None
def get_bounding_box(lat, lon):
"""
Create a bounding box around a point that extends roughly 25 meters in each direction
at Amsterdam's latitude (52.37°N):
- 0.000224 degrees latitude = 25 meters N/S
- 0.000368 degrees longitude = 25 meters E/W
"""
lat_offset = 0.000224 # 25 meters in latitude
lon_offset = 0.000368 # 25 meters in longitude
return [
lon - lon_offset, # min longitude
lat - lat_offset, # min latitude
lon + lon_offset, # max longitude
lat + lat_offset # max latitude
]
def get_nearest_image(lat, lon):
"""
Get the nearest Mapillary image to given coordinates
"""
bbox = get_bounding_box(lat, lon)
params = {
'fields': 'id,thumb_1024_url,is_pano',
'limit': 1,
'bbox': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}'
}
header = {'Authorization' : 'OAuth {}'.format(MAPILLARY_ACCESS_TOKEN)}
try:
response = requests.get(
"https://graph.mapillary.com/images",
params=params,
headers=header
)
response.raise_for_status()
data = response.json()
if 'data' in data and len(data['data']) > 0:
return data['data'][0]
return None
except requests.exceptions.RequestException as e:
st.error(f"Error fetching Mapillary data: {str(e)}")
return None
@st.cache_resource
def load_model():
"""Load the OpenCLIP model and return model and processor"""
model, _, preprocess = open_clip.create_model_and_transforms(
clip_model_name, pretrained=pretrained_name
)
tokenizer = open_clip.get_tokenizer(clip_model_name)
return model, preprocess, tokenizer
def process_image(image, preprocess):
"""Process image and return tensor"""
if isinstance(image, str):
# If image is a URL
response = requests.get(image)
image = Image.open(BytesIO(response.content))
# Ensure image is in RGB mode
if image.mode != 'RGB':
image = image.convert('RGB')
processed_image = preprocess(image).unsqueeze(0)
return processed_image
def knn_get_score(knn, k, cat, vec):
allvecs = knn[f'{cat}_vecs']
if debug: st.write('allvecs.shape', allvecs.shape)
scores = knn[f'{cat}_scores']
if debug: st.write('scores.shape', scores.shape)
# Compute cosine similiarity of vec against allvecs
# (both are already normalized)
cos_sim_table = vec @ allvecs.T
if debug: st.write('cos_sim_table.shape', cos_sim_table.shape)
# Get sorted array indices by similiarity in descending order
sortinds = np.flip(np.argsort(cos_sim_table, axis=1), axis=1)
if debug: st.write('sortinds.shape', sortinds.shape)
# Get corresponding scores for the sorted vectors
kscores = scores[sortinds][:,:k]
if debug: st.write('kscores.shape', kscores.shape)
# Get actual sorted similiarity scores
# (line copied from clip_retrieval_knn.py even though sortinds.shape[0] == 1 here)
ksims = cos_sim_table[np.expand_dims(np.arange(sortinds.shape[0]), axis=1), sortinds]
ksims = ksims[:,:k]
if debug: st.write('ksims.shape', ksims.shape)
# Apply normalization after exponential formula
ksims = softmax(10**ksims)
# Weighted sum
kweightedscore = np.sum(kscores * ksims)
return kweightedscore
@st.cache_resource
def load_knn():
return np.load(knnpath)
def main():
st.title("Percept: Map Explorer")
try:
with st.spinner('Loading CLIP model... This may take a moment.'):
model, preprocess, tokenizer = load_model()
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
except Exception as e:
st.error(f"Error loading model: {str(e)}")
st.info("Please make sure you have enough memory and the correct dependencies installed.")
with st.spinner('Loading KNN model... This may take a moment.'):
knn = load_knn()
# Initialize the map centered on Amsterdam
amsterdam_coords = [52.3676, 4.9041]
m = folium.Map(location=amsterdam_coords, zoom_start=13)
# Create a LayerGroup for the marker
marker_group = folium.FeatureGroup(name="Marker")
m.add_child(marker_group)
# Display the map and get clicked coordinates
map_data = st_folium(m, height=400, width=700)
# Check if a location was clicked
if map_data['last_clicked']:
lat = map_data['last_clicked']['lat']
lng = map_data['last_clicked']['lng']
# Add a marker
marker_group.add_child(folium.Marker(
[lat, lng],
popup=f"Selected Location\n{lat:.4f}, {lng:.4f}",
icon=folium.Icon(color="red", icon="info-sign")
))
st.write(f"Selected coordinates: {lat:.4f}, {lng:.4f}")
# Get nearest Mapillary image
with st.spinner('Fetching street view image...'):
image_data = get_nearest_image(lat, lng)
if image_data:
# Display the image
try:
if image_data['is_pano']:
st.write('Processing panoramic image')
image = process_panorama(image_data['thumb_1024_url'])
image_bytes = BytesIO()
#st.write('Resaving image size =', image.size, ' image format = ', image.format)
image.save(image_bytes, format='JPEG')
image = Image.open(image_bytes)
image_bytes = image_bytes.getvalue()
#st.write('Panoramic image size = ', image.size, ' format = ', image.format)
else:
response = requests.get(image_data['thumb_1024_url'])
image = Image.open(BytesIO(response.content))
image_bytes = response.content
st.image(image, caption="Street View", width=400, output_format='JPEG')
# Add download button
st.download_button(
label="Download Image",
data=image_bytes,
file_name=f"streetview_{lat}_{lng}.jpg",
mime="image/jpeg"
)
# Process image
with st.spinner('Processing image...'):
processed_image = process_image(image, preprocess)
processed_image = processed_image.to(device)
# Encode into CLIP vector
with torch.no_grad():
vec = model.encode_image(processed_image)
# Normalize vector
vec /= vec.norm(dim=-1, keepdim=True)
if debug: st.write(vec.shape)
vec = vec.numpy()
k = 40
for cat in categories:
st.write(cat, f'rating = {knn_get_score(knn, k, cat, vec):.1f}')
except Exception as e:
st.error(f"Error displaying image: {str(e)}")
else:
st.warning("No street view images found at this location. Try a different spot.")
if __name__ == "__main__":
main()