"""用户认证模块 - JWT Token 认证""" from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel from app.config import settings # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 Token 提取 oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_PREFIX}/auth/login") # 简易用户存储(生产环境应使用数据库) # 默认管理员账号: admin / admin123 _users_db = { "admin": { "username": "admin", "hashed_password": pwd_context.hash("admin123"), "role": "admin", } } class Token(BaseModel): access_token: str token_type: str = "bearer" username: str = "" role: str = "" class User(BaseModel): username: str role: str = "user" class UserCreate(BaseModel): username: str password: str role: str = "user" class PasswordChange(BaseModel): old_password: str new_password: str def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """生成密码哈希""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """创建 JWT Token""" to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def authenticate_user(username: str, password: str) -> Optional[dict]: """验证用户""" user = _users_db.get(username) if not user: return None if not verify_password(password, user["hashed_password"]): return None return user async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: """从 Token 获取当前用户(依赖注入)""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证凭据", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = _users_db.get(username) if user is None: raise credentials_exception return User(username=user["username"], role=user["role"]) def get_users_db(): """获取用户数据库(用于路由中管理用户)""" return _users_db