Spaces:
Sleeping
Sleeping
import streamlit as st | |
from streamlit_webrtc import VideoProcessorBase, webrtc_streamer | |
import uuid | |
import requests | |
import json | |
from typing import Dict, Optional | |
import time | |
class VideoProcessor(VideoProcessorBase): | |
def recv(self, frame): | |
img = frame.to_ndarray(format="bgr24") | |
return img | |
class WebRTCConnection: | |
def __init__(self): | |
self.server_url = "http://localhost:7860" | |
self.peer_id: Optional[str] = None | |
def register_peer(self, peer_id: str) -> Dict: | |
try: | |
response = requests.post( | |
f"{self.server_url}/register_peer", | |
json={"peer_id": peer_id}, | |
timeout=5 # Add timeout | |
) | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
st.error(f"Connection error: {str(e)}") | |
return {"status": "error", "message": str(e)} | |
def send_offer(self, peer_id: str, offer: Dict) -> Dict: | |
try: | |
response = requests.post( | |
f"{self.server_url}/send_offer", | |
json={"peer_id": peer_id, "offer": offer}, | |
timeout=5 | |
) | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
st.error(f"Connection error: {str(e)}") | |
return {"status": "error", "message": str(e)} | |
def get_answer(self, peer_id: str) -> Dict: | |
try: | |
response = requests.get( | |
f"{self.server_url}/get_answer", | |
params={"peer_id": peer_id}, | |
timeout=5 | |
) | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
st.error(f"Connection error: {str(e)}") | |
return {"status": "error", "message": str(e)} | |
def create_video_interface(): | |
st.set_page_config( | |
page_title="Video Call", | |
page_icon="π₯", | |
layout="wide", | |
initial_sidebar_state="collapsed" | |
) | |
# Initialize session state | |
if 'connection' not in st.session_state: | |
st.session_state.connection = WebRTCConnection() | |
if 'status' not in st.session_state: | |
st.session_state.status = "Initializing..." | |
if 'peer_id' not in st.session_state: | |
st.session_state.peer_id = None | |
st.title("π± Android Video Call") | |
# Create two columns for better mobile layout | |
col1, col2 = st.columns(2) | |
with col1: | |
peer_id_option = st.radio( | |
"Peer ID Option", | |
("Generate", "Enter"), | |
key="peer_id_option" | |
) | |
if peer_id_option == "Generate": | |
st.session_state.peer_id = str(uuid.uuid4())[:8] | |
st.code(st.session_state.peer_id, language=None) | |
else: | |
st.session_state.peer_id = st.text_input( | |
"Enter Peer ID", | |
key="peer_id_input", | |
max_chars=8, | |
placeholder="Enter 8-character ID" | |
) | |
with col2: | |
st.metric( | |
label="Status", | |
value=st.session_state.status | |
) | |
if st.button("Connect", key="connect_button", use_container_width=True): | |
if st.session_state.peer_id: | |
response = st.session_state.connection.register_peer( | |
st.session_state.peer_id | |
) | |
if response.get("status") == "registered": | |
st.session_state.status = "Connected" | |
st.rerun() # Using st.rerun() instead of experimental_rerun | |
else: | |
st.error("Please enter or generate a Peer ID") | |
# Video container with responsive layout | |
st.markdown(""" | |
<style> | |
.video-container { | |
position: relative; | |
width: 100%; | |
padding-bottom: 56.25%; | |
height: 0; | |
overflow: hidden; | |
} | |
.video-container iframe { | |
position: absolute; | |
top: 0; | |
left: 0; | |
width: 100%; | |
height: 100%; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
if st.session_state.status == "Connected": | |
with st.container(): | |
st.markdown("<div class='video-container'>", unsafe_allow_html=True) | |
webrtc_streamer( | |
key="local", | |
video_processor_factory=VideoProcessor, | |
rtc_configuration={ | |
"iceServers": [ | |
{"urls": ["stun:stun.l.google.com:19302"]}, | |
{ | |
"urls": ["turn:relay.metered.ca:80"], | |
"username": "openrelayproject", | |
"credential": "openrelayproject" | |
} | |
] | |
}, | |
media_stream_constraints={ | |
"video": { | |
"width": {"ideal": 640}, | |
"height": {"ideal": 480}, | |
"frameRate": {"max": 30} | |
}, | |
"audio": True | |
} | |
) | |
st.markdown("</div>", unsafe_allow_html=True) | |
if st.button("End Call", key="end_call", use_container_width=True): | |
st.session_state.status = "Call ended" | |
st.rerun() # Using st.rerun() instead of experimental_rerun | |
if __name__ == "__main__": | |
create_video_interface() |