CloudCall / fastapi_app.py
vincentiusyoshuac's picture
Update fastapi_app.py
9540b54 verified
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uuid
from pydantic import BaseModel
import asyncio
from typing import Dict, Optional
app = FastAPI()
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Store connections and their states
class Connection:
def __init__(self):
self.websocket: Optional[WebSocket] = None
self.offer: Optional[Dict] = None
self.answer: Optional[Dict] = None
connections: Dict[str, Connection] = {}
class PeerID(BaseModel):
peer_id: str
class WebRTCOffer(BaseModel):
peer_id: str
offer: Dict
class WebRTCAnswer(BaseModel):
peer_id: str
answer: Dict
@app.websocket("/ws/{peer_id}")
async def websocket_endpoint(websocket: WebSocket, peer_id: str):
await websocket.accept()
if peer_id not in connections:
connections[peer_id] = Connection()
connections[peer_id].websocket = websocket
try:
while True:
data = await websocket.receive_json()
if data["type"] == "offer":
connections[peer_id].offer = data["offer"]
if connections[peer_id].answer:
await websocket.send_json({
"type": "answer",
"answer": connections[peer_id].answer
})
elif data["type"] == "answer":
connections[peer_id].answer = data["answer"]
if connections[peer_id].websocket:
await connections[peer_id].websocket.send_json({
"type": "answer",
"answer": data["answer"]
})
except WebSocketDisconnect:
if peer_id in connections:
del connections[peer_id]
@app.post("/register_peer")
async def register_peer(peer: PeerID):
if peer.peer_id not in connections:
connections[peer.peer_id] = Connection()
return JSONResponse({
"status": "registered",
"peer_id": peer.peer_id
})
@app.post("/send_offer")
async def send_offer(offer: WebRTCOffer):
if offer.peer_id in connections:
connections[offer.peer_id].offer = offer.offer
return JSONResponse({"status": "offer_stored"})
return JSONResponse(
status_code=404,
content={"status": "error", "message": "Peer not found"}
)
@app.get("/get_offer/{peer_id}")
async def get_offer(peer_id: str):
if (peer_id in connections and
connections[peer_id].offer):
return JSONResponse({
"offer": connections[peer_id].offer
})
return JSONResponse({"status": "no_offer"})
@app.post("/send_answer")
async def send_answer(answer: WebRTCAnswer):
if answer.peer_id in connections:
connections[answer.peer_id].answer = answer.answer
return JSONResponse({"status": "answer_stored"})
return JSONResponse(
status_code=404,
content={"status": "error", "message": "Peer not found"}
)
@app.get("/get_answer/{peer_id}")
async def get_answer(peer_id: str):
if (peer_id in connections and
connections[peer_id].answer):
answer = connections[peer_id].answer
connections[peer_id].answer = None
return JSONResponse({"answer": answer})
return JSONResponse({"status": "no_answer"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)