Files
admin dbba1694d8 feat: 优化虚拟机列表,支持多主机聚合显示
- 新增 /vm/list-all API 聚合所有主机虚拟机
- parse_vm_info 支持 include_ip 参数控制IP获取
- VMList 添加主机选择器,显示宿主机列
- 修复 API 路径 /host/list -> /hosts/list
- 新增启动脚本 scripts/start.sh
- 新增 Guest Agent 安装脚本 scripts/install-guest-agent.sh
- 更新 README 文档
2026-05-07 14:52:45 +08:00

521 lines
18 KiB
Python

"""虚拟机管理路由"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import Optional, List
from lxml import etree
from app.hosts import list_hosts
import os
from app.libvirt_conn import conn_pool
from app.utils import parse_vm_info, generate_vm_xml
import libvirt
router = APIRouter()
# ===== 请求模型 =====
class VMCreate(BaseModel):
name: str = Field(..., description="虚拟机名称")
memory_mb: int = Field(2048, description="内存大小(MB)")
vcpus: int = Field(2, description="CPU核心数")
disk_gb: int = Field(20, description="磁盘大小(GB)")
pool_name: str = Field("default", description="存储池名称")
iso_path: Optional[str] = Field(None, description="ISO安装镜像路径")
network: str = Field("default", description="网络名称")
description: Optional[str] = Field(None, description="描述")
class VMAction(BaseModel):
action: str = Field(..., description="操作: start/stop/restart/pause/resume/force_stop")
class VMClone(BaseModel):
new_name: str = Field(..., description="新虚拟机名称")
# ===== API =====
@router.get("/list")
async def list_vms(host_id: str = Query("local"), include_ip: bool = False):
"""获取指定主机所有虚拟机列表(轻量模式,默认不获取IP)"""
conn = conn_pool.get_conn(host_id)
domains = conn.listAllDomains(0)
vms = []
for dom in domains:
try:
vm_info = parse_vm_info(dom, host_id, include_ip=include_ip)
vms.append(vm_info)
except Exception as e:
vms.append({
"name": dom.name(),
"uuid": dom.UUIDString(),
"state": "error",
"error": str(e),
})
return {"vms": vms, "total": len(vms), "host_id": host_id}
@router.get("/list-all")
async def list_all_vms(include_ip: bool = False):
"""获取所有主机所有虚拟机列表(聚合模式)"""
hosts = list_hosts()
all_vms = []
for host in hosts:
try:
conn = conn_pool.get_conn(host.id)
domains = conn.listAllDomains(0)
for dom in domains:
try:
vm_info = parse_vm_info(dom, host.id, include_ip=include_ip)
vm_info["host_id"] = host.id
vm_info["host_name"] = host.name
all_vms.append(vm_info)
except Exception:
all_vms.append({
"name": dom.name(),
"uuid": dom.UUIDString(),
"state": "error",
"host_id": host.id,
"host_name": host.name,
})
except Exception:
pass # 跳过无法连接的主机
return {"vms": all_vms, "total": len(all_vms), "host_count": len(hosts)}
@router.get("/detail/{name}")
async def get_vm_detail(name: str, host_id: str = Query("local")):
"""获取虚拟机详情"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
info = parse_vm_info(dom, host_id)
# 运行中的虚拟机获取更多动态信息
if info["state"] == "running":
try:
_, _, cpu_time, _ = dom.info()
info["cpu_time_ns"] = cpu_time
except Exception:
pass
try:
mem_stats = dom.memoryStats()
info["memory_stats"] = mem_stats
except Exception:
pass
try:
block_stats = []
for disk in info.get("disks", []):
if disk.get("dev"):
stats = dom.blockStats(disk["dev"])
block_stats.append({
"dev": disk["dev"],
"read_bytes": stats[1],
"write_bytes": stats[3],
"read_requests": stats[0],
"write_requests": stats[2],
})
info["block_stats"] = block_stats
except Exception:
pass
try:
net_stats = []
for i, iface in enumerate(info.get("interfaces", [])):
if iface.get("mac"):
stats = dom.interfaceStats(iface["mac"])
net_stats.append({
"mac": iface["mac"],
"rx_bytes": stats[0],
"tx_bytes": stats[4],
"rx_packets": stats[1],
"tx_packets": stats[5],
})
info["net_stats"] = net_stats
except Exception:
pass
return info
@router.get("/ip/{name}")
async def get_vm_ip(name: str, host_id: str = Query("local")):
"""获取虚拟机 IP 地址"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
if not dom.isActive():
return {"name": name, "ips": [], "message": "虚拟机未运行"}
xml_desc = dom.XMLDesc(0)
tree = etree.fromstring(xml_desc.encode())
interfaces = []
for iface in tree.findall(".//interface"):
source = iface.find("source")
mac_elem = iface.find("mac")
if mac_elem is None:
continue
mac = mac_elem.get("address", "")
network = source.get("network", "") if source is not None else ""
ips = []
# 方式1: QEMU Guest Agent
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_AGENT, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
if ifdata.get("hwaddr", "").lower() == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
if "." in addr:
ips.append({"ip": addr, "type": "ipv4", "source": "guest-agent"})
elif ":" in addr:
ips.append({"ip": addr, "type": "ipv6", "source": "guest-agent"})
except Exception:
pass
# 方式2: DHCP 租约
if not ips:
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
hwaddr = ifdata.get("hwaddr", "").lower()
if hwaddr == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
if "." in addr:
ips.append({"ip": addr, "type": "ipv4", "source": "dhcp-lease"})
except Exception:
pass
# 方式3: ARP 表查找
if not ips:
ip_list = _arp_lookup(mac, host_id)
for ip in ip_list:
ips.append({"ip": ip, "type": "ipv4", "source": "arp"})
interfaces.append({
"mac": mac,
"network": network,
"ips": ips,
})
return {"name": name, "interfaces": interfaces}
def _arp_lookup(mac: str, host_id: str = "local") -> list:
"""通过 ARP 表查找 MAC 对应的 IP"""
import subprocess
from app.hosts import get_host as get_host_info
host_info = get_host_info(host_id)
is_remote = host_info and host_info.type != "local"
try:
if is_remote and host_info.type == "ssh":
from urllib.parse import urlparse
parsed = urlparse(host_info.uri)
remote_host = parsed.hostname
ssh_args = ["ssh", "-o", "StrictHostKeyChecking=no"]
if host_info.ssh_key_path:
ssh_args.extend(["-i", host_info.ssh_key_path])
ssh_args.extend([remote_host, "ip", "neigh", "show"])
result = subprocess.run(ssh_args, capture_output=True, text=True, timeout=5)
else:
result = subprocess.run(
["ip", "neigh", "show"],
capture_output=True, text=True, timeout=3,
)
if result.returncode == 0:
found = []
for line in result.stdout.strip().split("\n"):
if mac.lower() in line.lower():
parts = line.split()
if parts and "." in parts[0]:
found.append(parts[0])
return found
except Exception:
pass
return []
@router.post("/create")
async def create_vm(vm: VMCreate, host_id: str = Query("local")):
"""创建虚拟机"""
conn = conn_pool.get_conn(host_id)
# 检查名称是否已存在
try:
conn.lookupByName(vm.name)
raise HTTPException(status_code=400, detail=f"虚拟机 '{vm.name}' 已存在")
except libvirt.libvirtError:
pass
with conn_pool.get_rw(host_id) as rw_conn:
try:
# 确定磁盘路径
pool = rw_conn.storagePoolLookupByName(vm.pool_name)
pool_info = pool.info()
pool_xml = etree.fromstring(pool.XMLDesc(0).encode())
target = pool_xml.find(".//target/path")
pool_path = target.text if target is not None else "/var/lib/libvirt/images"
disk_path = os.path.join(pool_path, f"{vm.name}.qcow2")
# 创建 qcow2 磁盘
vol_xml = f"""<volume>
<name>{vm.name}.qcow2</name>
<capacity unit='GiB'>{vm.disk_gb}</capacity>
<allocation unit='GiB'>1</allocation>
<target>
<format type='qcow2'/>
<permissions>
<mode>0644</mode>
</permissions>
</target>
</volume>"""
pool.createXML(vol_xml, 0)
# 生成虚拟机XML
vm_xml = generate_vm_xml(
name=vm.name,
memory_mb=vm.memory_mb,
vcpus=vm.vcpus,
disk_path=disk_path,
disk_size_gb=vm.disk_gb,
iso_path=vm.iso_path,
network=vm.network,
)
# 定义并启动
dom = rw_conn.defineXML(vm_xml)
dom.create()
return {"message": f"虚拟机 '{vm.name}' 创建成功", "name": vm.name}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"创建虚拟机失败: {str(e)}")
@router.post("/action/{name}")
async def vm_action(name: str, action: VMAction, host_id: str = Query("local")):
"""虚拟机操作"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
with conn_pool.get_rw(host_id) as rw_conn:
try:
rw_dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
act = action.action
try:
if act == "start":
rw_dom.create()
msg = f"虚拟机 '{name}' 已启动"
elif act == "stop":
rw_dom.shutdown()
msg = f"虚拟机 '{name}' 正在关闭"
elif act == "force_stop":
rw_dom.destroy()
msg = f"虚拟机 '{name}' 已强制关闭"
elif act == "restart":
rw_dom.reboot(libvirt.VIR_DOMAIN_REBOOT_DEFAULT)
msg = f"虚拟机 '{name}' 正在重启"
elif act == "pause":
rw_dom.suspend()
msg = f"虚拟机 '{name}' 已暂停"
elif act == "resume":
rw_dom.resume()
msg = f"虚拟机 '{name}' 已恢复"
else:
raise HTTPException(status_code=400, detail=f"不支持的操作: {act}")
return {"message": msg}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"操作失败: {str(e)}")
@router.delete("/delete/{name}")
async def delete_vm(name: str, force: bool = False, host_id: str = Query("local")):
"""删除虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
state, _ = dom.info()[0:2]
if state == libvirt.VIR_DOMAIN_RUNNING:
if force:
dom.destroy()
else:
raise HTTPException(
status_code=400,
detail=f"虚拟机 '{name}' 正在运行,请先关闭或使用 force=true"
)
# 获取磁盘路径
xml_desc = dom.XMLDesc(0)
tree = etree.fromstring(xml_desc.encode())
disk_files = []
for disk in tree.findall(".//disk[@device='disk']/source"):
f = disk.get("file")
if f:
disk_files.append(f)
# 取消定义
dom.undefine()
# 删除磁盘文件
for f in disk_files:
try:
os.remove(f)
except Exception:
pass
return {"message": f"虚拟机 '{name}' 已删除", "removed_disks": disk_files}
@router.post("/clone/{name}")
async def clone_vm(name: str, clone: VMClone, host_id: str = Query("local")):
"""克隆虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
# 检查新名称是否已存在
try:
rw_conn.lookupByName(clone.new_name)
raise HTTPException(status_code=400, detail=f"虚拟机 '{clone.new_name}' 已存在")
except libvirt.libvirtError:
pass
try:
xml_desc = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
tree = etree.fromstring(xml_desc.encode())
tree.find("name").text = clone.new_name
uuid_elem = tree.find("uuid")
if uuid_elem is not None:
tree.remove(uuid_elem)
import uuid as uuid_mod
new_uuid = str(uuid_mod.uuid4())[:8]
for disk in tree.findall(".//disk[@device='disk']"):
source = disk.find("source")
if source is not None:
old_path = source.get("file", "")
new_path = old_path.replace(f"{name}.qcow2", f"{clone.new_name}.qcow2")
source.set("file", new_path)
for mac in tree.findall(".//interface/mac"):
import random
mac_addr = "52:54:00:%02x:%02x:%02x" % (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
)
mac.set("address", mac_addr)
old_disk_path = ""
new_disk_path = ""
for disk in tree.findall(".//disk[@device='disk']/source"):
old_disk_path = disk.get("file", "")
new_disk_path = old_disk_path.replace(f"{name}.qcow2", f"{clone.new_name}.qcow2")
if old_disk_path and os.path.exists(old_disk_path):
import subprocess
subprocess.run(
["qemu-img", "create", "-f", "qcow2", "-b", old_disk_path, "-F", "qcow2", new_disk_path],
check=True,
capture_output=True,
)
new_xml = etree.tostring(tree, encoding="unicode")
rw_conn.defineXML(new_xml)
return {"message": f"虚拟机 '{name}' 已克隆为 '{clone.new_name}'"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"克隆失败: {str(e)}")
@router.get("/xml/{name}")
async def get_vm_xml(name: str, host_id: str = Query("local")):
"""获取虚拟机 XML 配置"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
return {"name": name, "xml": dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)}
@router.put("/xml/{name}")
async def update_vm_xml(name: str, xml: dict, host_id: str = Query("local")):
"""更新虚拟机 XML 配置"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
xml_str = xml.get("xml", "")
if not xml_str:
raise HTTPException(status_code=400, detail="XML不能为空")
try:
rw_conn.defineXML(xml_str)
return {"message": f"虚拟机 '{name}' 配置已更新"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
@router.post("/migrate/{name}")
async def migrate_vm(name: str, dest_uri: str, live: bool = True, host_id: str = Query("local")):
"""迁移虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
flags = libvirt.VIR_MIGRATE_LIVE if live else 0
flags |= libvirt.VIR_MIGRATE_PERSIST_DEST
try:
dest_conn = libvirt.open(dest_uri)
dom.migrate(dest_conn, flags, None, None, 0)
return {"message": f"虚拟机 '{name}' 已迁移到 {dest_uri}"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"迁移失败: {str(e)}")