|
import gradio as gr |
|
from PIL import Image |
|
import tempfile |
|
import os |
|
from pathlib import Path |
|
import shutil |
|
import requests |
|
import re |
|
import time |
|
from PIL import ImageEnhance |
|
import concurrent.futures |
|
import asyncio |
|
import aiohttp |
|
import io |
|
|
|
class StreetViewDownloader: |
|
def __init__(self): |
|
self.headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
} |
|
self.session = requests.Session() |
|
|
|
def extract_panoid(self, url): |
|
"""Extract panorama ID from Google Maps URL.""" |
|
pattern = r'!1s([A-Za-z0-9-_]+)!' |
|
match = re.search(pattern, url) |
|
if match: |
|
return match.group(1) |
|
raise ValueError("Could not find panorama ID in URL. Please make sure you're using a valid Street View URL.") |
|
|
|
async def download_tile_async(self, session, panoid, x, y, adjusted_y, zoom, output_dir): |
|
"""Download a single tile asynchronously.""" |
|
tile_url = f"https://streetviewpixels-pa.googleapis.com/v1/tile?cb_client=maps_sv.tactile&panoid={panoid}&x={x}&y={adjusted_y}&zoom={zoom}" |
|
output_file = Path(output_dir) / f"tile_{x}_{y}.jpg" |
|
|
|
try: |
|
async with session.get(tile_url, headers=self.headers) as response: |
|
if response.status == 200: |
|
content = await response.read() |
|
if len(content) > 1000: |
|
output_file.write_bytes(content) |
|
return (x, y) |
|
except Exception as e: |
|
print(f"Error downloading tile {x},{y}: {str(e)}") |
|
return None |
|
|
|
async def download_tiles_async(self, panoid, output_dir, zoom, cols, rows): |
|
"""Download tiles asynchronously with custom parameters.""" |
|
Path(output_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
vertical_offset = rows // 4 |
|
|
|
async with aiohttp.ClientSession() as session: |
|
tasks = [] |
|
for x in range(cols): |
|
for y in range(rows): |
|
adjusted_y = y - (rows // 2) + vertical_offset |
|
task = self.download_tile_async(session, panoid, x, y, adjusted_y, zoom, output_dir) |
|
tasks.append(task) |
|
|
|
downloaded_tiles = [] |
|
for result in await asyncio.gather(*tasks): |
|
if result: |
|
downloaded_tiles.append(result) |
|
|
|
return cols, rows, downloaded_tiles |
|
|
|
def download_tiles(self, panoid, output_dir, zoom, cols, rows): |
|
"""Synchronous wrapper for async download.""" |
|
return asyncio.run(self.download_tiles_async(panoid, output_dir, zoom, cols, rows)) |
|
|
|
def create_360_panorama(self, directory, cols, rows, downloaded_tiles, output_file): |
|
"""Create an equirectangular 360° panorama from tiles with optimized processing.""" |
|
directory = Path(directory) |
|
|
|
|
|
valid_tile = None |
|
for x, y in downloaded_tiles: |
|
tile_path = directory / f"tile_{x}_{y}.jpg" |
|
if tile_path.exists(): |
|
valid_tile = Image.open(tile_path) |
|
break |
|
|
|
if not valid_tile: |
|
raise Exception("No valid tiles found in directory") |
|
|
|
tile_width, tile_height = valid_tile.size |
|
valid_tile.close() |
|
|
|
panorama_width = tile_width * cols |
|
panorama_height = tile_height * rows |
|
|
|
panorama = Image.new('RGB', (panorama_width, panorama_height)) |
|
|
|
def process_tile(tile_info): |
|
x, y = tile_info |
|
tile_path = directory / f"tile_{x}_{y}.jpg" |
|
if tile_path.exists(): |
|
with Image.open(tile_path) as tile: |
|
if tile.getbbox(): |
|
return (x * tile_width, y * tile_height, tile.copy()) |
|
return None |
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
|
tile_results = list(executor.map(process_tile, downloaded_tiles)) |
|
|
|
for result in tile_results: |
|
if result: |
|
x, y, tile = result |
|
panorama.paste(tile, (x, y)) |
|
tile.close() |
|
|
|
bbox = panorama.getbbox() |
|
if bbox: |
|
panorama = panorama.crop(bbox) |
|
|
|
panorama = self.enhance_panorama(panorama) |
|
panorama.save(output_file, 'JPEG', quality=95, optimize=True) |
|
return panorama |
|
|
|
def enhance_panorama(self, panorama): |
|
"""Quick enhancement with minimal processing.""" |
|
enhancer = ImageEnhance.Contrast(panorama) |
|
panorama = enhancer.enhance(1.1) |
|
return panorama |
|
|
|
|
|
|
|
def process_street_view(url, zoom, cols, rows, progress=gr.Progress()): |
|
"""Process the Street View URL with custom parameters.""" |
|
try: |
|
|
|
zoom = int(zoom) |
|
cols = int(cols) |
|
rows = int(rows) |
|
|
|
if zoom < 0 or zoom > 5: |
|
raise ValueError("Zoom level must be between 0 and 5") |
|
if cols < 1 or rows < 1: |
|
raise ValueError("Columns and rows must be positive numbers") |
|
if cols * rows > 1000: |
|
raise ValueError("Total number of tiles (cols * rows) cannot exceed 1000") |
|
|
|
temp_dir = tempfile.mkdtemp() |
|
progress(0.1, desc="Initializing...") |
|
|
|
downloader = StreetViewDownloader() |
|
panoid = downloader.extract_panoid(url) |
|
progress(0.2, desc="Extracted panorama ID...") |
|
|
|
tiles_dir = os.path.join(temp_dir, f"{panoid}_tiles") |
|
output_file = os.path.join(temp_dir, f"{panoid}_360panorama.jpg") |
|
|
|
progress(0.3, desc=f"Downloading {cols}x{rows} tiles at zoom level {zoom}...") |
|
cols, rows, downloaded_tiles = downloader.download_tiles(panoid, tiles_dir, zoom, cols, rows) |
|
|
|
progress(0.7, desc="Creating panorama...") |
|
panorama = downloader.create_360_panorama(tiles_dir, cols, rows, downloaded_tiles, output_file) |
|
|
|
progress(0.9, desc="Cleaning up...") |
|
shutil.rmtree(tiles_dir) |
|
|
|
progress(1.0, desc="Done!") |
|
return output_file, panorama |
|
|
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
return None, None |
|
|
|
def create_gradio_interface(): |
|
|
|
example_links = [ |
|
["https://www.google.com/maps/@40.7579718,-73.9855442,3a,75y,36.7h,81.89t/data=!3m7!1e1!3m5!1sNYigM1H6Vz7eLR8fRmykuw!2e0!6shttps:%2F%2Fstreetviewpixels-pa.googleapis.com%2Fv1%2Fthumbnail%3Fpanoid%3DNYigM1H6Vz7eLR8fRmykuw%26cb_client%3Dmaps_sv.tactile.gps%26w%3D203%26h%3D100%26yaw%3D136.23409%26pitch%3D0%26thumbfov%3D100!7i16384!8i8192?coh=205409&entry=ttu&g_ep=EgoyMDI0MTAyMy4wIKXMDSoASAFQAw%3D%3D", "2", "16", "8"], |
|
["https://www.google.com/maps/@48.8560009,2.2981995,3a,75y,315.8h,98.33t/data=!3m7!1e1!3m5!1sgaXVhUqbqmZ006KVmB6TLw!2e0!6shttps:%2F%2Fstreetviewpixels-pa.googleapis.com%2Fv1%2Fthumbnail%3Fpanoid%3DgaXVhUqbqmZ006KVmB6TLw%26cb_client%3Dmaps_sv.tactile.gps%26w%3D203%26h%3D100%26yaw%3D294.00525%26pitch%3D0%26thumbfov%3D100!7i16384!8i8192?coh=205409&entry=ttu&g_ep=EgoyMDI0MTAyMy4wIKXMDSoASAFQAw%3D%3D", "3", "32", "16"], |
|
["https://www.google.com/maps/@27.1749733,78.0430361,2a,75y,276.92h,78.63t/data=!3m6!1e1!3m4!1sIinHqsOZDjMDKZ7STo_Kdw!2e0!7i13312!8i6656?coh=205409&entry=ttu&g_ep=EgoyMDI0MTAyMy4wIKXMDSoASAFQAw%3D%3D", "1", "8", "4"] |
|
] |
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# Street View Panorama Downloader") |
|
gr.Markdown("Download panoramic images from Google Street View") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
url_input = gr.Textbox( |
|
label="Street View URL", |
|
placeholder="Paste Google Street View URL here...", |
|
info="Open Street View, copy the URL from your browser" |
|
) |
|
|
|
with gr.Row(): |
|
zoom_level = gr.Number( |
|
label="Zoom Level (0-5)", |
|
value=2, |
|
minimum=0, |
|
maximum=5, |
|
step=1, |
|
info="Higher zoom = more detail but slower download" |
|
) |
|
cols_input = gr.Number( |
|
label="Columns", |
|
value=16, |
|
minimum=1, |
|
maximum=128, |
|
step=1, |
|
info="Number of horizontal tiles" |
|
) |
|
rows_input = gr.Number( |
|
label="Rows", |
|
value=8, |
|
minimum=1, |
|
maximum=64, |
|
step=1, |
|
info="Number of vertical tiles" |
|
) |
|
|
|
submit_btn = gr.Button("Download and Process", variant="primary") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
output_file = gr.File(label="Download Panorama") |
|
with gr.Column(): |
|
output_image = gr.Image(label="Preview", type="pil") |
|
|
|
|
|
gr.Examples( |
|
examples=example_links, |
|
inputs=[url_input, zoom_level, cols_input, rows_input], |
|
outputs=[output_file, output_image], |
|
fn=process_street_view, |
|
cache_examples=True, |
|
label="Example Locations" |
|
) |
|
|
|
submit_btn.click( |
|
fn=process_street_view, |
|
inputs=[url_input, zoom_level, cols_input, rows_input], |
|
outputs=[output_file, output_image] |
|
) |
|
|
|
gr.Markdown(""" |
|
### Parameter Guide: |
|
|
|
1. **Zoom Level (0-5):** |
|
- 0: Lowest quality, fastest download (~4x4 tiles) |
|
- 1: Low quality (~8x4 tiles) |
|
- 2: Medium quality, recommended (~16x8 tiles) |
|
- 3: High quality, slower (~32x16 tiles) |
|
- 4: Very high quality, very slow (~64x32 tiles) |
|
- 5: Maximum quality, extremely slow (~128x64 tiles) |
|
|
|
2. **Columns:** |
|
- Controls horizontal resolution |
|
- More columns = wider image |
|
- Recommended values: |
|
- Fast: 8-16 columns |
|
- Balanced: 16-32 columns |
|
- High quality: 32-64 columns |
|
|
|
3. **Rows:** |
|
- Controls vertical resolution |
|
- More rows = taller image |
|
- Typically use half the number of columns |
|
- Recommended values: |
|
- Fast: 4-8 rows |
|
- Balanced: 8-16 rows |
|
- High quality: 16-32 rows |
|
|
|
### Example Settings: |
|
- Times Square: Balanced quality (zoom=2, cols=16, rows=8) |
|
- Eiffel Tower: High quality (zoom=3, cols=32, rows=16) |
|
- Taj Mahal: Fast preview (zoom=1, cols=8, rows=4) |
|
""") |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
app = create_gradio_interface() |
|
app.launch(share=True) |