from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import uuid from pydantic import BaseModel import asyncio from typing import Dict, Optional app = FastAPI() # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Store connections and their states class Connection: def __init__(self): self.websocket: Optional[WebSocket] = None self.offer: Optional[Dict] = None self.answer: Optional[Dict] = None connections: Dict[str, Connection] = {} class PeerID(BaseModel): peer_id: str class WebRTCOffer(BaseModel): peer_id: str offer: Dict class WebRTCAnswer(BaseModel): peer_id: str answer: Dict @app.websocket("/ws/{peer_id}") async def websocket_endpoint(websocket: WebSocket, peer_id: str): await websocket.accept() if peer_id not in connections: connections[peer_id] = Connection() connections[peer_id].websocket = websocket try: while True: data = await websocket.receive_json() if data["type"] == "offer": connections[peer_id].offer = data["offer"] if connections[peer_id].answer: await websocket.send_json({ "type": "answer", "answer": connections[peer_id].answer }) elif data["type"] == "answer": connections[peer_id].answer = data["answer"] if connections[peer_id].websocket: await connections[peer_id].websocket.send_json({ "type": "answer", "answer": data["answer"] }) except WebSocketDisconnect: if peer_id in connections: del connections[peer_id] @app.post("/register_peer") async def register_peer(peer: PeerID): if peer.peer_id not in connections: connections[peer.peer_id] = Connection() return JSONResponse({ "status": "registered", "peer_id": peer.peer_id }) @app.post("/send_offer") async def send_offer(offer: WebRTCOffer): if offer.peer_id in connections: connections[offer.peer_id].offer = offer.offer return JSONResponse({"status": "offer_stored"}) return JSONResponse( status_code=404, content={"status": "error", "message": "Peer not found"} ) @app.get("/get_offer/{peer_id}") async def get_offer(peer_id: str): if (peer_id in connections and connections[peer_id].offer): return JSONResponse({ "offer": connections[peer_id].offer }) return JSONResponse({"status": "no_offer"}) @app.post("/send_answer") async def send_answer(answer: WebRTCAnswer): if answer.peer_id in connections: connections[answer.peer_id].answer = answer.answer return JSONResponse({"status": "answer_stored"}) return JSONResponse( status_code=404, content={"status": "error", "message": "Peer not found"} ) @app.get("/get_answer/{peer_id}") async def get_answer(peer_id: str): if (peer_id in connections and connections[peer_id].answer): answer = connections[peer_id].answer connections[peer_id].answer = None return JSONResponse({"answer": answer}) return JSONResponse({"status": "no_answer"}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)