Plonk / app.py
nicolas-dufour's picture
initial commit
cf60dfb
import streamlit as st
import pandas as pd
from PIL import Image
import torch
from pipe import PlonkPipeline
from pathlib import Path
from streamlit_extras.colored_header import colored_header
import plotly.express as px
import requests
from io import BytesIO
# Set page config
st.set_page_config(
page_title="Around the World in 80 Timesteps", page_icon="πŸ—ΊοΈ", layout="wide"
)
device = "cuda" if torch.cuda.is_available() else "cpu"
PROJECT_ROOT = Path(__file__).parent.parent.absolute()
# Define checkpoint path
CHECKPOINT_DIR = PROJECT_ROOT / "checkpoints"
MODEL_NAMES = {
"PLONK_YFCC": "nicolas-dufour/PLONK_YFCC",
"PLONK_OSV_5M": "nicolas-dufour/PLONK_OSV_5M",
"PLONK_iNaturalist": "nicolas-dufour/PLONK_iNaturalist",
}
@st.cache_resource
def load_model(model_name):
"""Load the model and cache it to prevent reloading"""
try:
pipe = PlonkPipeline(model_path=model_name)
return pipe
except Exception as e:
st.error(f"Error loading model: {str(e)}")
st.stop()
PIPES = {model_name: load_model(MODEL_NAMES[model_name]) for model_name in MODEL_NAMES}
def predict_location(image, model_name, cfg=0.0, num_samples=256):
with torch.no_grad():
batch = {"img": [], "emb": []}
# If image is already a PIL Image, use it directly
if isinstance(image, Image.Image):
img = image.convert("RGB")
else:
img = Image.open(image).convert("RGB")
pipe = PIPES[model_name]
# Create a progress bar
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(step, total_steps):
progress = float(step) / float(total_steps)
progress_bar.progress(progress)
status_text.text(f"Sampling step {step + 1}/{total_steps}")
# Get regular predictions with progress updates
predicted_gps = pipe(
img,
batch_size=num_samples,
cfg=cfg,
num_steps=16,
callback=update_progress
)
# Get single high-confidence prediction
status_text.text("Generating high-confidence prediction...")
high_conf_gps = pipe(img, batch_size=1, cfg=2.0, num_steps=16)
# Clear the status text and progress bar
status_text.empty()
progress_bar.empty()
return {
"lat": predicted_gps[:, 0].astype(float).tolist(),
"lon": predicted_gps[:, 1].astype(float).tolist(),
"high_conf_lat": high_conf_gps[0, 0].astype(float),
"high_conf_lon": high_conf_gps[0, 1].astype(float),
}
def load_example_images():
"""Load example images from the examples directory"""
examples_dir = Path(__file__).parent / "examples"
if not examples_dir.exists():
st.error(
"""
Examples directory not found. Please create the following structure:
demo/
└── examples/
β”œβ”€β”€ eiffel_tower.jpg
β”œβ”€β”€ colosseum.jpg
β”œβ”€β”€ taj_mahal.jpg
β”œβ”€β”€ statue_liberty.jpg
└── sydney_opera.jpg
"""
)
return {}
examples = {}
for img_path in examples_dir.glob("*.jpg"):
# Use filename without extension as the key
name = img_path.stem.replace("_", " ").title()
examples[name] = str(img_path)
if not examples:
st.warning("No example images found in the examples directory.")
return examples
def resize_image_for_display(image, max_size=400):
"""Resize image while maintaining aspect ratio"""
# Get current size
width, height = image.size
# Calculate ratio to maintain aspect ratio
if width > height:
if width > max_size:
ratio = max_size / width
new_size = (max_size, int(height * ratio))
else:
if height > max_size:
ratio = max_size / height
new_size = (int(width * ratio), max_size)
# Only resize if image is larger than max_size
if width > max_size or height > max_size:
return image.resize(new_size, Image.Resampling.LANCZOS)
return image
def load_image_from_url(url):
"""Load an image from a URL"""
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for bad status codes
return Image.open(BytesIO(response.content))
except Exception as e:
st.error(f"Error loading image from URL: {str(e)}")
return None
def main():
# Custom CSS
st.markdown(
"""
<style>
.main {
padding: 0rem 1rem;
}
.stButton>button {
width: 100%;
background-color: #FF4B4B;
color: white;
border: none;
padding: 0.5rem 1rem;
border-radius: 0.5rem;
}
.stButton>button:hover {
background-color: #FF6B6B;
}
.prediction-box {
background-color: #f0f2f6;
padding: 1.5rem;
border-radius: 0.5rem;
margin: 1rem 0;
}
/* New styles for image containers */
.upload-container {
max-height: 300px;
overflow-y: auto;
margin-bottom: 1rem;
}
.examples-container {
max-height: 200px;
display: flex;
gap: 10px;
}
.stTabs [data-baseweb="tab-panel"] {
padding-top: 1rem;
}
</style>
""",
unsafe_allow_html=True,
)
# Header with custom styling
colored_header(
label="πŸ—ΊοΈ Around the World in 80 Timesteps: A Generative Approach to Global Visual Geolocation",
description="Upload an image and our model, PLONK, will predict possible locations! In red we will sample one point with guidance scale 2.0 for the best guess. Project page: https://nicolas-dufour.github.io/plonk",
color_name="red-70",
)
# Adjust column ratio to give 2/3 of the space to the map
col1, col2 = st.columns([1, 2], gap="large")
with col1:
# Add model selection before the sliders
model_name = st.selectbox(
"πŸ€– Select Model",
options=MODEL_NAMES.keys(),
index=0, # Default to YFCC
help="Choose which PLONK model variant to use for prediction.",
)
# Modify the slider columns to accommodate both controls
col_slider1, col_slider2 = st.columns([0.5, 0.5])
with col_slider1:
cfg_value = st.slider(
"🎯 Guidance scale",
min_value=0.0,
max_value=5.0,
value=0.0,
step=0.1,
help="Scale for classifier-free guidance during sampling. A small value makes the model predictions display the diversity of the model, while a large value makes the model predictions more conservative but potentially more accurate.",
)
with col_slider2:
num_samples = st.number_input(
"🎲 Number of samples",
min_value=1,
max_value=5000,
value=64,
step=1,
help="Number of location predictions to generate. More samples give better coverage but take longer to compute.",
)
st.markdown("### πŸ“Έ Choose your image")
tab1, tab2, tab3 = st.tabs(["Upload", "URL", "Examples"])
with tab1:
uploaded_file = st.file_uploader(
"Choose an image...",
type=["png", "jpg", "jpeg"],
help="Supported formats: PNG, JPG, JPEG",
)
if uploaded_file is not None:
st.markdown('<div class="upload-container">', unsafe_allow_html=True)
original_image = Image.open(uploaded_file)
display_image = resize_image_for_display(
original_image.copy(), max_size=300
)
st.image(
display_image, caption="Uploaded Image", use_container_width=True
)
st.markdown("</div>", unsafe_allow_html=True)
if st.button("πŸ” Predict Location", key="predict_upload"):
predictions = predict_location(
original_image,
model_name=model_name,
cfg=cfg_value,
num_samples=num_samples,
)
st.session_state["predictions"] = predictions
with tab2:
url = st.text_input("Enter image URL:", key="image_url")
if url:
image = load_image_from_url(url)
if image:
st.markdown(
'<div class="upload-container">', unsafe_allow_html=True
)
display_image = resize_image_for_display(image.copy(), max_size=300)
st.image(
display_image,
caption="Image from URL",
use_container_width=True,
)
st.markdown("</div>", unsafe_allow_html=True)
if st.button("πŸ” Predict Location", key="predict_url"):
predictions = predict_location(
image,
model_name=model_name,
cfg=cfg_value,
num_samples=num_samples,
)
st.session_state["predictions"] = predictions
with tab3:
examples = load_example_images()
st.markdown('<div class="examples-container">', unsafe_allow_html=True)
example_cols = st.columns(len(examples))
for idx, (name, path) in enumerate(examples.items()):
with example_cols[idx]:
original_image = Image.open(path)
display_image = resize_image_for_display(
original_image.copy(), max_size=150
)
if st.container().button(
"πŸ“Έ",
key=f"img_{name}",
help=f"Click to predict location for {name}",
use_container_width=True,
):
predictions = predict_location(
original_image,
model_name=model_name,
cfg=cfg_value,
num_samples=num_samples,
)
st.session_state["predictions"] = predictions
st.rerun()
st.image(display_image, caption=name, use_container_width=True)
st.markdown("</div>", unsafe_allow_html=True)
with col2:
st.markdown("### 🌍 Predicted Locations")
if "predictions" in st.session_state:
pred = st.session_state["predictions"]
# Create DataFrame for all predictions
df = pd.DataFrame(
{
"lat": pred["lat"],
"lon": pred["lon"],
"type": ["Sample"] * len(pred["lat"]),
}
)
# Add high-confidence prediction
df = pd.concat(
[
df,
pd.DataFrame(
{
"lat": [pred["high_conf_lat"]],
"lon": [pred["high_conf_lon"]],
"type": ["Best Guess"],
}
),
]
)
# Create a more interactive map using Plotly
fig = px.scatter_mapbox(
df,
lat="lat",
lon="lon",
zoom=2,
opacity=0.6,
color="type",
color_discrete_map={"Sample": "blue", "Best Guess": "red"},
mapbox_style="carto-positron",
)
fig.update_traces(selector=dict(name="Best Guess"), marker_size=15)
fig.update_layout(
margin={"r": 0, "t": 0, "l": 0, "b": 0},
height=500,
showlegend=True,
legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01),
)
# Display map in a container
with st.container():
st.plotly_chart(fig, use_container_width=True)
# Display stats in a styled container
with st.container():
st.markdown(
f"""
<div class="prediction-box">
<h4>πŸ“Š Prediction Statistics</h4>
<p>Number of sampled locations: {len(pred["lat"])}</p>
<p>Best guess location: {pred["high_conf_lat"]:.2f}Β°, {pred["high_conf_lon"]:.2f}Β°</p>
</div>
""",
unsafe_allow_html=True,
)
else:
# Empty state with better styling
st.markdown(
"""
<div class="prediction-box" style="text-align: center;">
<h4>πŸ‘† Upload an image and click 'Predict Location'</h4>
<p>The predicted locations will appear here on an interactive map.</p>
</div>
""",
unsafe_allow_html=True,
)
if __name__ == "__main__":
main()