Files
kvm-manager/backend/app/auth.py
T
admin 8ccccf8f52 feat: 多主机纳管、用户认证、noVNC控制台、深色主题
主要功能:
- 多主机管理: 支持TCP/SSH方式纳管远程KVM主机
- 用户认证: JWT token认证, 默认admin/admin123
- noVNC控制台: 前端集成noVNC, WebSocket代理VNC连接
- 深色主题: 全局Element Plus深色主题覆盖
- 虚拟机操作: 克隆、迁移、XML编辑、快照管理
- 资源监控: CPU/内存/磁盘IO/网络流量实时监控

Bug修复:
- libvirt getInfo()内存单位修正(MiB非KiB)
- 远程主机VNC 0.0.0.0监听地址连接策略修复
- Dashboard定时器内存泄漏修复
- bcrypt版本兼容性修复
2026-05-07 12:41:10 +08:00

103 řádky
2.8 KiB
Python

"""用户认证模块 - JWT Token 认证"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from app.config import settings
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 Token 提取
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_PREFIX}/auth/login")
# 简易用户存储(生产环境应使用数据库)
# 默认管理员账号: admin / admin123
_users_db = {
"admin": {
"username": "admin",
"hashed_password": pwd_context.hash("admin123"),
"role": "admin",
}
}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
username: str = ""
role: str = ""
class User(BaseModel):
username: str
role: str = "user"
class UserCreate(BaseModel):
username: str
password: str
role: str = "user"
class PasswordChange(BaseModel):
old_password: str
new_password: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""创建 JWT Token"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def authenticate_user(username: str, password: str) -> Optional[dict]:
"""验证用户"""
user = _users_db.get(username)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
return user
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""从 Token 获取当前用户(依赖注入)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = _users_db.get(username)
if user is None:
raise credentials_exception
return User(username=user["username"], role=user["role"])
def get_users_db():
"""获取用户数据库(用于路由中管理用户)"""
return _users_db