import uvicorn from contextlib import asynccontextmanager from datetime import timedelta from typing import List from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.ext.asyncio import AsyncSession from database import engine, get_session from models import Base, User, Server import crud import schemas import security from config import ACCESS_TOKEN_EXPIRE_MINUTES @asynccontextmanager async def lifespan(app: FastAPI): # on startup async with engine.begin() as conn: # await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) yield # on shutdown # (nothing to do here for now) app = FastAPI(lifespan=lifespan) # --- Authentication --- @app.post("/token", response_model=schemas.Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_session)): user = await crud.authenticate_user(db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=401, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = security.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # --- Users --- @app.post("/users/", response_model=schemas.User) async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_session)): db_user = await crud.get_user_by_username(db, username=user.username) if db_user: raise HTTPException(status_code=400, detail="Username already registered") return await crud.create_user(db=db, user=user) @app.get("/users/me/", response_model=schemas.User) async def read_users_me(current_user: User = Depends(security.get_current_user)): return current_user # --- Servers --- @app.post("/servers/", response_model=schemas.Server) async def create_server( server: schemas.ServerCreate, db: AsyncSession = Depends(get_session), current_user: User = Depends(security.get_current_user) ): return await crud.create_server(db=db, server=server, owner_id=current_user.id) @app.get("/servers/", response_model=List[schemas.Server]) async def read_servers(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_session)): servers = await crud.get_servers(db, skip=skip, limit=limit) return servers @app.get("/servers/{server_id}", response_model=schemas.Server) async def read_server(server_id: int, db: AsyncSession = Depends(get_session)): db_server = await crud.get_server(db, server_id=server_id) if db_server is None: raise HTTPException(status_code=404, detail="Server not found") return db_server # --- General --- @app.get("/") async def read_root(): return {"message": "Pycord server is running"} # --- WebSocket --- class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.broadcast(f"Client #{client_id} says: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"Client #{client_id} left the chat") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)