Files
kvm-manager/backend/app/routers/auth.py
T
admin 8ccccf8f52 feat: 多主机纳管、用户认证、noVNC控制台、深色主题
主要功能:
- 多主机管理: 支持TCP/SSH方式纳管远程KVM主机
- 用户认证: JWT token认证, 默认admin/admin123
- noVNC控制台: 前端集成noVNC, WebSocket代理VNC连接
- 深色主题: 全局Element Plus深色主题覆盖
- 虚拟机操作: 克隆、迁移、XML编辑、快照管理
- 资源监控: CPU/内存/磁盘IO/网络流量实时监控

Bug修复:
- libvirt getInfo()内存单位修正(MiB非KiB)
- 远程主机VNC 0.0.0.0监听地址连接策略修复
- Dashboard定时器内存泄漏修复
- bcrypt版本兼容性修复
2026-05-07 12:41:10 +08:00

97 rader
3.4 KiB
Python

"""认证路由"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from app.auth import (
authenticate_user, create_access_token, get_password_hash,
get_current_user, User, UserCreate, PasswordChange, Token,
get_users_db, verify_password,
)
router = APIRouter()
@router.post("/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""用户登录,获取 JWT Token"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user["username"], "role": user["role"]})
return Token(
access_token=access_token,
username=user["username"],
role=user["role"],
)
@router.get("/me", response_model=User)
async def get_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息"""
return current_user
@router.post("/change-password")
async def change_password(
pwd: PasswordChange,
current_user: User = Depends(get_current_user),
):
"""修改密码"""
db = get_users_db()
user = db.get(current_user.username)
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
if not verify_password(pwd.old_password, user["hashed_password"]):
raise HTTPException(status_code=400, detail="旧密码错误")
user["hashed_password"] = get_password_hash(pwd.new_password)
return {"message": "密码修改成功"}
@router.post("/register")
async def register(user_create: UserCreate, current_user: User = Depends(get_current_user)):
"""注册新用户(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可注册新用户")
db = get_users_db()
if user_create.username in db:
raise HTTPException(status_code=400, detail="用户名已存在")
db[user_create.username] = {
"username": user_create.username,
"hashed_password": get_password_hash(user_create.password),
"role": user_create.role,
}
return {"message": f"用户 '{user_create.username}' 创建成功"}
@router.get("/users")
async def list_users(current_user: User = Depends(get_current_user)):
"""列出所有用户(仅管理员)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可查看用户列表")
db = get_users_db()
return {
"users": [
{"username": u["username"], "role": u["role"]}
for u in db.values()
]
}
@router.delete("/users/{username}")
async def delete_user(username: str, current_user: User = Depends(get_current_user)):
"""删除用户(仅管理员,不能删除自己)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="仅管理员可删除用户")
if username == current_user.username:
raise HTTPException(status_code=400, detail="不能删除自己")
db = get_users_db()
if username not in db:
raise HTTPException(status_code=404, detail="用户不存在")
del db[username]
return {"message": f"用户 '{username}' 已删除"}