Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
from ultralytics import YOLO # For street detection | |
import folium | |
from streamlit_folium import st_folium | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
import numpy as np | |
import torch | |
from sklearn.utils.extmath import softmax | |
import open_clip | |
import os | |
knnpath = '20241204-ams-no-env-open_clip_ViT-H-14-378-quickgelu.npz' | |
clip_model_name = 'ViT-H-14-378-quickgelu' | |
pretrained_name = 'dfn5b' | |
categories = ['walkability', 'bikeability', 'pleasantness', 'greenness', 'safety'] | |
debug = False | |
# Set page config | |
st.set_page_config( | |
page_title="Percept", | |
layout="wide" | |
) | |
# Securely get the token from environment variables | |
MAPILLARY_ACCESS_TOKEN = os.environ.get('MAPILLARY_ACCESS_TOKEN') | |
# Verify token exists | |
if not MAPILLARY_ACCESS_TOKEN: | |
st.error("Mapillary access token not found. Please configure it in the Space secrets.") | |
st.stop() | |
def detect_and_crop_street(panorama_url, use_yolo=True): | |
""" | |
Detect streets in a panoramic image and return a cropped normal-sized image | |
Args: | |
panorama_url: URL of the panoramic image | |
use_yolo: Whether to use YOLOv8 (True) or simple edge detection (False) | |
Returns: | |
cropped_image: PIL Image containing the cropped street view | |
""" | |
# Download and convert image to CV2 format | |
response = requests.get(panorama_url) | |
img = Image.open(BytesIO(response.content)) | |
cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) | |
if use_yolo: | |
# Load YOLOv8 model | |
model = YOLO('yolov8n.pt') | |
# Detect objects | |
results = model(cv_img) | |
# Look for road/street class (index 0 in COCO dataset) | |
street_boxes = [] | |
for result in results: | |
for box, cls in zip(result.boxes.xyxy, result.boxes.cls): | |
if cls == 0: # road class | |
street_boxes.append(box.cpu().numpy()) | |
if street_boxes: | |
# Take the largest street detection | |
largest_box = max(street_boxes, key=lambda box: (box[2]-box[0])*(box[3]-box[1])) | |
x1, y1, x2, y2 = map(int, largest_box) | |
midx = (x2 - x1) / 2 | |
# Add some padding | |
padding = 200 | |
height, width = cv_img.shape[:2] | |
x1 = max(0, x1 - padding) | |
y1 = max(0, y1 - padding) | |
x2 = min(width, x2 + padding) | |
y2 = min(height, y2 + padding) | |
cropped = cv_img[y1:y2, x1:x2] | |
else: | |
# Fallback to edge detection if no streets found | |
cropped = edge_based_crop(cv_img) | |
else: | |
cropped = edge_based_crop(cv_img) | |
# Convert back to PIL Image | |
cropped_pil = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) | |
# Resize to standard dimensions while maintaining aspect ratio | |
target_width = 1024 | |
aspect_ratio = cropped.shape[1] / cropped.shape[0] | |
target_height = int(target_width / aspect_ratio) | |
cropped_pil = cropped_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) | |
return cropped_pil | |
def edge_based_crop(cv_img): | |
""" | |
Use edge detection to find and crop around street areas | |
""" | |
# Convert to grayscale | |
gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) | |
# Apply Gaussian blur | |
blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
# Detect edges | |
edges = cv2.Canny(blurred, 50, 150) | |
# Find contours | |
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
if contours: | |
# Find the largest contour | |
largest_contour = max(contours, key=cv2.contourArea) | |
x, y, w, h = cv2.boundingRect(largest_contour) | |
# Add padding | |
padding = 200 | |
height, width = cv_img.shape[:2] | |
x = max(0, x - padding) | |
y = max(0, y - padding) | |
w = min(width - x, w + 2*padding) | |
h = min(height - y, h + 2*padding) | |
return cv_img[y:y+h, x:x+w] | |
else: | |
# If no contours found, return center crop | |
height, width = cv_img.shape[:2] | |
center_x = width // 2 | |
center_y = height // 2 | |
crop_width = width // 3 | |
crop_height = height // 3 | |
return cv_img[center_y-crop_height//2:center_y+crop_height//2, | |
center_x-crop_width//2:center_x+crop_width//2] | |
# Example usage in your Streamlit app: | |
def process_panorama(panorama_url): | |
""" | |
Process a panoramic image to get a street-centered crop | |
""" | |
try: | |
cropped_image = detect_and_crop_street(panorama_url) | |
return cropped_image | |
except Exception as e: | |
st.error(f"Error processing panorama: {str(e)}") | |
return None | |
def get_bounding_box(lat, lon): | |
""" | |
Create a bounding box around a point that extends roughly 25 meters in each direction | |
at Amsterdam's latitude (52.37°N): | |
- 0.000224 degrees latitude = 25 meters N/S | |
- 0.000368 degrees longitude = 25 meters E/W | |
""" | |
lat_offset = 0.000224 # 25 meters in latitude | |
lon_offset = 0.000368 # 25 meters in longitude | |
return [ | |
lon - lon_offset, # min longitude | |
lat - lat_offset, # min latitude | |
lon + lon_offset, # max longitude | |
lat + lat_offset # max latitude | |
] | |
def get_nearest_image(lat, lon): | |
""" | |
Get the nearest Mapillary image to given coordinates | |
""" | |
bbox = get_bounding_box(lat, lon) | |
params = { | |
'fields': 'id,thumb_1024_url,is_pano', | |
'limit': 1, | |
'bbox': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}' | |
} | |
header = {'Authorization' : 'OAuth {}'.format(MAPILLARY_ACCESS_TOKEN)} | |
try: | |
response = requests.get( | |
"https://graph.mapillary.com/images", | |
params=params, | |
headers=header | |
) | |
response.raise_for_status() | |
data = response.json() | |
if 'data' in data and len(data['data']) > 0: | |
return data['data'][0] | |
return None | |
except requests.exceptions.RequestException as e: | |
st.error(f"Error fetching Mapillary data: {str(e)}") | |
return None | |
def load_model(): | |
"""Load the OpenCLIP model and return model and processor""" | |
model, _, preprocess = open_clip.create_model_and_transforms( | |
clip_model_name, pretrained=pretrained_name | |
) | |
tokenizer = open_clip.get_tokenizer(clip_model_name) | |
return model, preprocess, tokenizer | |
def process_image(image, preprocess): | |
"""Process image and return tensor""" | |
if isinstance(image, str): | |
# If image is a URL | |
response = requests.get(image) | |
image = Image.open(BytesIO(response.content)) | |
# Ensure image is in RGB mode | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
processed_image = preprocess(image).unsqueeze(0) | |
return processed_image | |
def knn_get_score(knn, k, cat, vec): | |
allvecs = knn[f'{cat}_vecs'] | |
if debug: st.write('allvecs.shape', allvecs.shape) | |
scores = knn[f'{cat}_scores'] | |
if debug: st.write('scores.shape', scores.shape) | |
# Compute cosine similiarity of vec against allvecs | |
# (both are already normalized) | |
cos_sim_table = vec @ allvecs.T | |
if debug: st.write('cos_sim_table.shape', cos_sim_table.shape) | |
# Get sorted array indices by similiarity in descending order | |
sortinds = np.flip(np.argsort(cos_sim_table, axis=1), axis=1) | |
if debug: st.write('sortinds.shape', sortinds.shape) | |
# Get corresponding scores for the sorted vectors | |
kscores = scores[sortinds][:,:k] | |
if debug: st.write('kscores.shape', kscores.shape) | |
# Get actual sorted similiarity scores | |
# (line copied from clip_retrieval_knn.py even though sortinds.shape[0] == 1 here) | |
ksims = cos_sim_table[np.expand_dims(np.arange(sortinds.shape[0]), axis=1), sortinds] | |
ksims = ksims[:,:k] | |
if debug: st.write('ksims.shape', ksims.shape) | |
# Apply normalization after exponential formula | |
ksims = softmax(10**ksims) | |
# Weighted sum | |
kweightedscore = np.sum(kscores * ksims) | |
return kweightedscore | |
def load_knn(): | |
return np.load(knnpath) | |
def main(): | |
st.title("Percept: Map Explorer") | |
try: | |
with st.spinner('Loading CLIP model... This may take a moment.'): | |
model, preprocess, tokenizer = load_model() | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model = model.to(device) | |
except Exception as e: | |
st.error(f"Error loading model: {str(e)}") | |
st.info("Please make sure you have enough memory and the correct dependencies installed.") | |
with st.spinner('Loading KNN model... This may take a moment.'): | |
knn = load_knn() | |
# Initialize the map centered on Amsterdam | |
amsterdam_coords = [52.3676, 4.9041] | |
m = folium.Map(location=amsterdam_coords, zoom_start=13) | |
# Create a LayerGroup for the marker | |
marker_group = folium.FeatureGroup(name="Marker") | |
m.add_child(marker_group) | |
# Display the map and get clicked coordinates | |
map_data = st_folium(m, height=400, width=700) | |
# Check if a location was clicked | |
if map_data['last_clicked']: | |
lat = map_data['last_clicked']['lat'] | |
lng = map_data['last_clicked']['lng'] | |
# Add a marker | |
marker_group.add_child(folium.Marker( | |
[lat, lng], | |
popup=f"Selected Location\n{lat:.4f}, {lng:.4f}", | |
icon=folium.Icon(color="red", icon="info-sign") | |
)) | |
st.write(f"Selected coordinates: {lat:.4f}, {lng:.4f}") | |
# Get nearest Mapillary image | |
with st.spinner('Fetching street view image...'): | |
image_data = get_nearest_image(lat, lng) | |
if image_data: | |
# Display the image | |
try: | |
if image_data['is_pano']: | |
st.write('Processing panoramic image') | |
image = process_panorama(image_data['thumb_1024_url']) | |
image_bytes = BytesIO() | |
#st.write('Resaving image size =', image.size, ' image format = ', image.format) | |
image.save(image_bytes, format='JPEG') | |
image = Image.open(image_bytes) | |
image_bytes = image_bytes.getvalue() | |
#st.write('Panoramic image size = ', image.size, ' format = ', image.format) | |
else: | |
response = requests.get(image_data['thumb_1024_url']) | |
image = Image.open(BytesIO(response.content)) | |
image_bytes = response.content | |
st.image(image, caption="Street View", width=400, output_format='JPEG') | |
# Add download button | |
st.download_button( | |
label="Download Image", | |
data=image_bytes, | |
file_name=f"streetview_{lat}_{lng}.jpg", | |
mime="image/jpeg" | |
) | |
# Process image | |
with st.spinner('Processing image...'): | |
processed_image = process_image(image, preprocess) | |
processed_image = processed_image.to(device) | |
# Encode into CLIP vector | |
with torch.no_grad(): | |
vec = model.encode_image(processed_image) | |
# Normalize vector | |
vec /= vec.norm(dim=-1, keepdim=True) | |
if debug: st.write(vec.shape) | |
vec = vec.numpy() | |
k = 40 | |
for cat in categories: | |
st.write(cat, f'rating = {knn_get_score(knn, k, cat, vec):.1f}') | |
except Exception as e: | |
st.error(f"Error displaying image: {str(e)}") | |
else: | |
st.warning("No street view images found at this location. Try a different spot.") | |
if __name__ == "__main__": | |
main() | |