import streamlit as st import cv2 from ultralytics import YOLO # For street detection import folium from streamlit_folium import st_folium import requests from PIL import Image from io import BytesIO import numpy as np import torch from sklearn.utils.extmath import softmax import open_clip import os knnpath = '20241204-ams-no-env-open_clip_ViT-H-14-378-quickgelu.npz' clip_model_name = 'ViT-H-14-378-quickgelu' pretrained_name = 'dfn5b' categories = ['walkability', 'bikeability', 'pleasantness', 'greenness', 'safety'] debug = False # Set page config st.set_page_config( page_title="Percept", layout="wide" ) # Securely get the token from environment variables MAPILLARY_ACCESS_TOKEN = os.environ.get('MAPILLARY_ACCESS_TOKEN') # Verify token exists if not MAPILLARY_ACCESS_TOKEN: st.error("Mapillary access token not found. Please configure it in the Space secrets.") st.stop() def detect_and_crop_street(panorama_url, use_yolo=True): """ Detect streets in a panoramic image and return a cropped normal-sized image Args: panorama_url: URL of the panoramic image use_yolo: Whether to use YOLOv8 (True) or simple edge detection (False) Returns: cropped_image: PIL Image containing the cropped street view """ # Download and convert image to CV2 format response = requests.get(panorama_url) img = Image.open(BytesIO(response.content)) cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) if use_yolo: # Load YOLOv8 model model = YOLO('yolov8n.pt') # Detect objects results = model(cv_img) # Look for road/street class (index 0 in COCO dataset) street_boxes = [] for result in results: for box, cls in zip(result.boxes.xyxy, result.boxes.cls): if cls == 0: # road class street_boxes.append(box.cpu().numpy()) if street_boxes: # Take the largest street detection largest_box = max(street_boxes, key=lambda box: (box[2]-box[0])*(box[3]-box[1])) x1, y1, x2, y2 = map(int, largest_box) midx = (x2 - x1) / 2 # Add some padding padding = 200 height, width = cv_img.shape[:2] x1 = max(0, x1 - padding) y1 = max(0, y1 - padding) x2 = min(width, x2 + padding) y2 = min(height, y2 + padding) cropped = cv_img[y1:y2, x1:x2] else: # Fallback to edge detection if no streets found cropped = edge_based_crop(cv_img) else: cropped = edge_based_crop(cv_img) # Convert back to PIL Image cropped_pil = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) # Resize to standard dimensions while maintaining aspect ratio target_width = 1024 aspect_ratio = cropped.shape[1] / cropped.shape[0] target_height = int(target_width / aspect_ratio) cropped_pil = cropped_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) return cropped_pil def edge_based_crop(cv_img): """ Use edge detection to find and crop around street areas """ # Convert to grayscale gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) # Apply Gaussian blur blurred = cv2.GaussianBlur(gray, (5, 5), 0) # Detect edges edges = cv2.Canny(blurred, 50, 150) # Find contours contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: # Find the largest contour largest_contour = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(largest_contour) # Add padding padding = 200 height, width = cv_img.shape[:2] x = max(0, x - padding) y = max(0, y - padding) w = min(width - x, w + 2*padding) h = min(height - y, h + 2*padding) return cv_img[y:y+h, x:x+w] else: # If no contours found, return center crop height, width = cv_img.shape[:2] center_x = width // 2 center_y = height // 2 crop_width = width // 3 crop_height = height // 3 return cv_img[center_y-crop_height//2:center_y+crop_height//2, center_x-crop_width//2:center_x+crop_width//2] # Example usage in your Streamlit app: def process_panorama(panorama_url): """ Process a panoramic image to get a street-centered crop """ try: cropped_image = detect_and_crop_street(panorama_url) return cropped_image except Exception as e: st.error(f"Error processing panorama: {str(e)}") return None def get_bounding_box(lat, lon): """ Create a bounding box around a point that extends roughly 25 meters in each direction at Amsterdam's latitude (52.37°N): - 0.000224 degrees latitude = 25 meters N/S - 0.000368 degrees longitude = 25 meters E/W """ lat_offset = 0.000224 # 25 meters in latitude lon_offset = 0.000368 # 25 meters in longitude return [ lon - lon_offset, # min longitude lat - lat_offset, # min latitude lon + lon_offset, # max longitude lat + lat_offset # max latitude ] def get_nearest_image(lat, lon): """ Get the nearest Mapillary image to given coordinates """ bbox = get_bounding_box(lat, lon) params = { 'fields': 'id,thumb_1024_url,is_pano', 'limit': 1, 'bbox': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}' } header = {'Authorization' : 'OAuth {}'.format(MAPILLARY_ACCESS_TOKEN)} try: response = requests.get( "https://graph.mapillary.com/images", params=params, headers=header ) response.raise_for_status() data = response.json() if 'data' in data and len(data['data']) > 0: return data['data'][0] return None except requests.exceptions.RequestException as e: st.error(f"Error fetching Mapillary data: {str(e)}") return None @st.cache_resource def load_model(): """Load the OpenCLIP model and return model and processor""" model, _, preprocess = open_clip.create_model_and_transforms( clip_model_name, pretrained=pretrained_name ) tokenizer = open_clip.get_tokenizer(clip_model_name) return model, preprocess, tokenizer def process_image(image, preprocess): """Process image and return tensor""" if isinstance(image, str): # If image is a URL response = requests.get(image) image = Image.open(BytesIO(response.content)) # Ensure image is in RGB mode if image.mode != 'RGB': image = image.convert('RGB') processed_image = preprocess(image).unsqueeze(0) return processed_image def knn_get_score(knn, k, cat, vec): allvecs = knn[f'{cat}_vecs'] if debug: st.write('allvecs.shape', allvecs.shape) scores = knn[f'{cat}_scores'] if debug: st.write('scores.shape', scores.shape) # Compute cosine similiarity of vec against allvecs # (both are already normalized) cos_sim_table = vec @ allvecs.T if debug: st.write('cos_sim_table.shape', cos_sim_table.shape) # Get sorted array indices by similiarity in descending order sortinds = np.flip(np.argsort(cos_sim_table, axis=1), axis=1) if debug: st.write('sortinds.shape', sortinds.shape) # Get corresponding scores for the sorted vectors kscores = scores[sortinds][:,:k] if debug: st.write('kscores.shape', kscores.shape) # Get actual sorted similiarity scores # (line copied from clip_retrieval_knn.py even though sortinds.shape[0] == 1 here) ksims = cos_sim_table[np.expand_dims(np.arange(sortinds.shape[0]), axis=1), sortinds] ksims = ksims[:,:k] if debug: st.write('ksims.shape', ksims.shape) # Apply normalization after exponential formula ksims = softmax(10**ksims) # Weighted sum kweightedscore = np.sum(kscores * ksims) return kweightedscore @st.cache_resource def load_knn(): return np.load(knnpath) def main(): st.title("Percept: Map Explorer") try: with st.spinner('Loading CLIP model... This may take a moment.'): model, preprocess, tokenizer = load_model() device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device) except Exception as e: st.error(f"Error loading model: {str(e)}") st.info("Please make sure you have enough memory and the correct dependencies installed.") with st.spinner('Loading KNN model... This may take a moment.'): knn = load_knn() # Initialize the map centered on Amsterdam amsterdam_coords = [52.3676, 4.9041] m = folium.Map(location=amsterdam_coords, zoom_start=13) # Create a LayerGroup for the marker marker_group = folium.FeatureGroup(name="Marker") m.add_child(marker_group) # Display the map and get clicked coordinates map_data = st_folium(m, height=400, width=700) # Check if a location was clicked if map_data['last_clicked']: lat = map_data['last_clicked']['lat'] lng = map_data['last_clicked']['lng'] # Add a marker marker_group.add_child(folium.Marker( [lat, lng], popup=f"Selected Location\n{lat:.4f}, {lng:.4f}", icon=folium.Icon(color="red", icon="info-sign") )) st.write(f"Selected coordinates: {lat:.4f}, {lng:.4f}") # Get nearest Mapillary image with st.spinner('Fetching street view image...'): image_data = get_nearest_image(lat, lng) if image_data: # Display the image try: if image_data['is_pano']: st.write('Processing panoramic image') image = process_panorama(image_data['thumb_1024_url']) image_bytes = BytesIO() #st.write('Resaving image size =', image.size, ' image format = ', image.format) image.save(image_bytes, format='JPEG') image = Image.open(image_bytes) image_bytes = image_bytes.getvalue() #st.write('Panoramic image size = ', image.size, ' format = ', image.format) else: response = requests.get(image_data['thumb_1024_url']) image = Image.open(BytesIO(response.content)) image_bytes = response.content st.image(image, caption="Street View", width=400, output_format='JPEG') # Add download button st.download_button( label="Download Image", data=image_bytes, file_name=f"streetview_{lat}_{lng}.jpg", mime="image/jpeg" ) # Process image with st.spinner('Processing image...'): processed_image = process_image(image, preprocess) processed_image = processed_image.to(device) # Encode into CLIP vector with torch.no_grad(): vec = model.encode_image(processed_image) # Normalize vector vec /= vec.norm(dim=-1, keepdim=True) if debug: st.write(vec.shape) vec = vec.numpy() k = 40 for cat in categories: st.write(cat, f'rating = {knn_get_score(knn, k, cat, vec):.1f}') except Exception as e: st.error(f"Error displaying image: {str(e)}") else: st.warning("No street view images found at this location. Try a different spot.") if __name__ == "__main__": main()