Files
admin dbba1694d8 feat: 优化虚拟机列表,支持多主机聚合显示
- 新增 /vm/list-all API 聚合所有主机虚拟机
- parse_vm_info 支持 include_ip 参数控制IP获取
- VMList 添加主机选择器,显示宿主机列
- 修复 API 路径 /host/list -> /hosts/list
- 新增启动脚本 scripts/start.sh
- 新增 Guest Agent 安装脚本 scripts/install-guest-agent.sh
- 更新 README 文档
2026-05-07 14:52:45 +08:00

335 строки
10 KiB
Python

"""虚拟机 XML 模板和工具函数"""
import uuid
import libvirt
from lxml import etree
def generate_vm_xml(
name: str,
memory_mb: int,
vcpus: int,
disk_path: str,
disk_size_gb: int = 20,
iso_path: str = None,
network: str = "default",
vnc_port: int = -1,
os_type: str = "hvm",
arch: str = "x86_64",
machine: str = "pc",
) -> str:
"""生成虚拟机 XML 定义"""
vm_uuid = str(uuid.uuid4())
# 基础 XML 结构
xml_parts = f"""<domain type='kvm'>
<name>{name}</name>
<uuid>{vm_uuid}</uuid>
<memory unit='MiB'>{memory_mb}</memory>
<currentMemory unit='MiB'>{memory_mb}</currentMemory>
<vcpu placement='static'>{vcpus}</vcpu>
<os>
<type arch='{arch}' machine='{machine}'>{os_type}</type>
<boot dev='hd'/>
<boot dev='cdrom'/>
</os>
<features>
<acpi/>
<apic/>
</features>
<cpu mode='host-passthrough'/>
<clock offset='utc'>
<timer name='rtc' tickpolicy='catchup'/>
<timer name='pit' tickpolicy='delay'/>
<timer name='hpet' present='no'/>
</clock>
<on_poweroff>destroy</on_poweroff>
<on_reboot>restart</on_reboot>
<on_crash>destroy</on_crash>
<devices>
<emulator>/usr/bin/qemu-system-x86_64</emulator>
<disk type='file' device='disk'>
<driver name='qemu' type='qcow2'/>
<source file='{disk_path}'/>
<target dev='vda' bus='virtio'/>
</disk>"""
# 光驱(ISO安装)
if iso_path:
xml_parts += f"""
<disk type='file' device='cdrom'>
<driver name='qemu' type='raw'/>
<source file='{iso_path}'/>
<target dev='hda' bus='ide'/>
<readonly/>
</disk>"""
# VNC
if vnc_port == -1:
vnc_port = 5900 # auto-allocate by libvirt
xml_parts += f"""
<graphics type='vnc' port='{vnc_port}' autoport='yes' listen='0.0.0.0' passwd=''>
<listen type='address' address='0.0.0.0'/>
</graphics>
<video>
<model type='virtio' heads='1' primary='yes'/>
</video>
<serial type='pty'>
<target port='0'/>
</serial>
<console type='pty'>
<target type='serial' port='0'/>
</console>"""
# 网络
xml_parts += f"""
<interface type='network'>
<source network='{network}'/>
<model type='virtio'/>
</interface>
<controller type='usb' model='qemu-xhci'/>
<input type='tablet' bus='usb'/>
<memballoon model='virtio'/>
</devices>
</domain>"""
return xml_parts
def parse_vm_info(dom, host_id: str = "local", include_ip: bool = False) -> dict:
"""从 libvirt domain 对象提取虚拟机信息
Args:
dom: libvirt domain 对象
host_id: 主机ID
include_ip: 是否获取IP地址(开启会明显变慢,默认关闭)
"""
from app.libvirt_conn import libvirt_conn
# 基本信息
info = {
"id": dom.ID(),
"name": dom.name(),
"uuid": dom.UUIDString(),
"state": _get_state(dom),
"autostart": False,
}
try:
info["autostart"] = dom.autostart() == 1
except Exception:
pass
# 解析 XML
xml_desc = dom.XMLDesc(0)
tree = etree.fromstring(xml_desc.encode())
# 内存和CPU
mem = tree.find(".//memory")
cur_mem = tree.find(".//currentMemory")
vcpu = tree.find(".//vcpu")
mem_unit = mem.get("unit", "KiB") if mem is not None else "KiB"
mem_val = int(mem.text) if mem is not None else 0
info["memory_mb"] = _to_mb(mem_val, mem_unit)
cur_unit = cur_mem.get("unit", "KiB") if cur_mem is not None else "KiB"
cur_val = int(cur_mem.text) if cur_mem is not None else 0
info["current_memory_mb"] = _to_mb(cur_val, cur_unit)
info["vcpus"] = int(vcpu.text) if vcpu is not None else 1
# CPU type
cpu = tree.find(".//cpu")
if cpu is not None:
info["cpu_mode"] = cpu.get("mode", "unknown")
else:
info["cpu_mode"] = "unknown"
# 磁盘
disks = []
for disk in tree.findall(".//disk"):
if disk.get("device") == "disk":
source = disk.find("source")
target = disk.find("target")
driver = disk.find("driver")
disk_info = {
"file": source.get("file", "") if source is not None else "",
"dev": target.get("dev", "") if target is not None else "",
"bus": target.get("bus", "") if target is not None else "",
"format": driver.get("type", "") if driver is not None else "",
}
disks.append(disk_info)
info["disks"] = disks
# 网络
interfaces = []
for iface in tree.findall(".//interface"):
source = iface.find("source")
model = iface.find("model")
target = iface.find("target")
iface_info = {
"type": iface.get("type", ""),
"network": source.get("network", "") if source is not None else "",
"bridge": source.get("bridge", "") if source is not None else "",
"model": model.get("type", "") if model is not None else "",
"dev": target.get("dev", "") if target is not None else "",
}
mac_elem = iface.find("mac")
if mac_elem is not None:
iface_info["mac"] = mac_elem.get("address", "")
interfaces.append(iface_info)
info["interfaces"] = interfaces
# 获取 IP 地址(仅在需要时)
if include_ip and info["state"] == "running":
arp_cache = _get_arp_table(host_id)
for iface in interfaces:
if "mac" in iface:
# 先尝试 Guest Agent 和 DHCP 租约
iface["ip"] = _get_vm_ip(dom, iface["mac"], iface.get("network", ""), host_id, None)
# 如果没找到再用 ARP 表
if not iface.get("ip") and arp_cache:
mac_lower = iface["mac"].lower()
if mac_lower in arp_cache:
iface["ip"] = arp_cache[mac_lower]
else:
# 不获取 IP 时,初始化 ip 字段为空
for iface in interfaces:
iface["ip"] = ""
# VNC
graphics = tree.find(".//graphics[@type='vnc']")
if graphics is not None:
info["vnc_port"] = int(graphics.get("port", -1))
info["vnc_listen"] = graphics.get("listen", "127.0.0.1")
else:
info["vnc_port"] = -1
info["vnc_listen"] = ""
# OS info
os_type = tree.find(".//os/type")
info["os_type"] = os_type.text if os_type is not None else "hvm"
return info
def _get_arp_table(host_id: str = "local") -> dict:
"""获取 ARP 表(MAC -> IP 映射),返回 {mac_lower: ip}"""
import subprocess
from app.hosts import get_host as get_host_info
host_info = get_host_info(host_id)
is_remote = host_info and host_info.type != "local"
try:
if is_remote and host_info.type == "ssh":
from urllib.parse import urlparse
parsed = urlparse(host_info.uri)
remote_host = parsed.hostname
ssh_args = ["ssh", "-o", "StrictHostKeyChecking=no"]
if host_info.ssh_key_path:
ssh_args.extend(["-i", host_info.ssh_key_path])
ssh_args.extend([remote_host, "ip", "neigh", "show"])
result = subprocess.run(ssh_args, capture_output=True, text=True, timeout=5)
else:
result = subprocess.run(
["ip", "neigh", "show"],
capture_output=True, text=True, timeout=3,
)
if result.returncode == 0:
arp = {}
for line in result.stdout.strip().split("\n"):
parts = line.split()
if len(parts) >= 5:
ip = parts[0]
# lladdr 行格式: IP dev IFACE lladdr MAC ...
for i, p in enumerate(parts):
if p == "lladdr" and i + 1 < len(parts) and "." in ip:
arp[parts[i + 1].lower()] = ip
return arp
except Exception:
pass
return {}
def _get_vm_ip(dom, mac: str, network: str = "", host_id: str = "local", arp_cache: dict = None) -> str:
"""多种方式尝试获取虚拟机 IP 地址"""
# 方式1: QEMU Guest Agent(最准确)
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_AGENT, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
if ifdata.get("hwaddr", "").lower() == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
# 优先返回 IPv4
if "." in addr:
return addr
if addrs:
return addrs[0].get("addr", "")
except Exception:
pass
# 方式2: DHCP 租约(通过 libvirt 网络)
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
hwaddr = ifdata.get("hwaddr", "").lower()
if hwaddr == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
if "." in addr:
return addr
if addrs:
return addrs[0].get("addr", "")
except Exception:
pass
# 方式3: ARP 表查找(通过 MAC 地址查 IP)
if arp_cache is not None and mac.lower() in arp_cache:
return arp_cache[mac.lower()]
return ""
def _get_state(dom) -> str:
"""获取虚拟机运行状态"""
raw = dom.info()
state = raw[0]
state_map = {
libvirt.VIR_DOMAIN_NOSTATE: "nostate",
libvirt.VIR_DOMAIN_RUNNING: "running",
libvirt.VIR_DOMAIN_BLOCKED: "blocked",
libvirt.VIR_DOMAIN_PAUSED: "paused",
libvirt.VIR_DOMAIN_SHUTDOWN: "shutdown",
libvirt.VIR_DOMAIN_SHUTOFF: "shutoff",
libvirt.VIR_DOMAIN_CRASHED: "crashed",
libvirt.VIR_DOMAIN_PMSUSPENDED: "suspended",
}
return state_map.get(state, "unknown")
def _to_mb(value, unit) -> int:
"""转换为 MB"""
unit = unit.lower()
if unit in ("kib", "k", "kib"):
return value // 1024
elif unit in ("mib", "m", "mib"):
return value
elif unit in ("gib", "g", "gib"):
return value * 1024
elif unit in ("tib", "t"):
return value * 1024 * 1024
elif unit == "b":
return value // (1024 * 1024)
return value // 1024 # default KiB