from typing import AsyncGenerator |
from app import app |
from fastapi import Depends |
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase |
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine |
from sqlalchemy.orm import DeclarativeBase |
from sqlalchemy import URL |
db_url = URL.create( |
"postgresql+asyncpg", |
username="avnadmin", |
password="AVNS_u12fHxoNLBbzD8TGpqN", |
host="pg-opengenai-opengenai.b.aivencloud.com", |
database="defaultdb", |
port=14535, |
) |
DATABASE_URL = db_url |
class Base(DeclarativeBase): |
pass |
class User(SQLAlchemyBaseUserTableUUID, Base): |
pass |
engine = create_async_engine(DATABASE_URL, echo=True) |
async_session_maker = async_sessionmaker(engine, |
autoflush=True, |
autocommit=False, |
expire_on_commit=False, |
) |
async def create_db_and_tables(): |
async with engine.begin() as conn: |
await conn.run_sync(Base.metadata.create_all) |
async def get_async_session() -> AsyncGenerator[AsyncSession, None]: |
async with async_session_maker() as session: |
yield session |
async def get_user_db(session: AsyncSession = Depends(get_async_session)): |
yield SQLAlchemyUserDatabase(session, User) |
async def connect() -> None: |
async with engine.begin() as conn: |
await conn.run_sync(Base.metadata.create_all, checkfirst=True) |
async def disconnect() -> None: |
if engine: |
await engine.dispose() |