"""虚拟机 XML 模板和工具函数""" import uuid import libvirt from lxml import etree def generate_vm_xml( name: str, memory_mb: int, vcpus: int, disk_path: str, disk_size_gb: int = 20, iso_path: str = None, network: str = "default", vnc_port: int = -1, os_type: str = "hvm", arch: str = "x86_64", machine: str = "pc", ) -> str: """生成虚拟机 XML 定义""" vm_uuid = str(uuid.uuid4()) # 基础 XML 结构 xml_parts = f""" {name} {vm_uuid} {memory_mb} {memory_mb} {vcpus} {os_type} destroy restart destroy /usr/bin/qemu-system-x86_64 """ # 光驱(ISO安装) if iso_path: xml_parts += f""" """ # VNC if vnc_port == -1: vnc_port = 5900 # auto-allocate by libvirt xml_parts += f""" """ # 网络 xml_parts += f""" """ return xml_parts def parse_vm_info(dom) -> dict: """从 libvirt domain 对象提取虚拟机信息""" from app.libvirt_conn import libvirt_conn # 基本信息 info = { "id": dom.ID(), "name": dom.name(), "uuid": dom.UUIDString(), "state": _get_state(dom), "autostart": False, } try: info["autostart"] = dom.autostart() == 1 except Exception: pass # 解析 XML xml_desc = dom.XMLDesc(0) tree = etree.fromstring(xml_desc.encode()) # 内存和CPU mem = tree.find(".//memory") cur_mem = tree.find(".//currentMemory") vcpu = tree.find(".//vcpu") mem_unit = mem.get("unit", "KiB") if mem is not None else "KiB" mem_val = int(mem.text) if mem is not None else 0 info["memory_mb"] = _to_mb(mem_val, mem_unit) cur_unit = cur_mem.get("unit", "KiB") if cur_mem is not None else "KiB" cur_val = int(cur_mem.text) if cur_mem is not None else 0 info["current_memory_mb"] = _to_mb(cur_val, cur_unit) info["vcpus"] = int(vcpu.text) if vcpu is not None else 1 # CPU type cpu = tree.find(".//cpu") if cpu is not None: info["cpu_mode"] = cpu.get("mode", "unknown") else: info["cpu_mode"] = "unknown" # 磁盘 disks = [] for disk in tree.findall(".//disk"): if disk.get("device") == "disk": source = disk.find("source") target = disk.find("target") driver = disk.find("driver") disk_info = { "file": source.get("file", "") if source is not None else "", "dev": target.get("dev", "") if target is not None else "", "bus": target.get("bus", "") if target is not None else "", "format": driver.get("type", "") if driver is not None else "", } disks.append(disk_info) info["disks"] = disks # 网络 interfaces = [] for iface in tree.findall(".//interface"): source = iface.find("source") model = iface.find("model") iface_info = { "type": iface.get("type", ""), "network": source.get("network", "") if source is not None else "", "model": model.get("type", "") if model is not None else "", } # 如果运行中,获取MAC和IP if info["state"] == "running": mac = iface.find("mac") if mac is not None: iface_info["mac"] = mac.get("address", "") # 尝试获取IP地址 try: ifaces = dom.interfaceAddresses( libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_AGENT, 0 ) if ifaces: for ifname, ifdata in ifaces.items(): if mac is not None and ifdata.get("hwaddr", "") == iface_info.get("mac", ""): addrs = ifdata.get("addrs", []) if addrs: iface_info["ip"] = addrs[0].get("addr", "") except Exception: try: ifaces = dom.interfaceAddresses( libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE, 0 ) if ifaces: for ifname, ifdata in ifaces.items(): addrs = ifdata.get("addrs", []) if addrs: iface_info["ip"] = addrs[0].get("addr", "") break except Exception: pass interfaces.append(iface_info) info["interfaces"] = interfaces # VNC graphics = tree.find(".//graphics[@type='vnc']") if graphics is not None: info["vnc_port"] = int(graphics.get("port", -1)) info["vnc_listen"] = graphics.get("listen", "127.0.0.1") else: info["vnc_port"] = -1 info["vnc_listen"] = "" # OS info os_type = tree.find(".//os/type") info["os_type"] = os_type.text if os_type is not None else "hvm" return info def _get_state(dom) -> str: """获取虚拟机运行状态""" raw = dom.info() state = raw[0] state_map = { libvirt.VIR_DOMAIN_NOSTATE: "nostate", libvirt.VIR_DOMAIN_RUNNING: "running", libvirt.VIR_DOMAIN_BLOCKED: "blocked", libvirt.VIR_DOMAIN_PAUSED: "paused", libvirt.VIR_DOMAIN_SHUTDOWN: "shutdown", libvirt.VIR_DOMAIN_SHUTOFF: "shutoff", libvirt.VIR_DOMAIN_CRASHED: "crashed", libvirt.VIR_DOMAIN_PMSUSPENDED: "suspended", } return state_map.get(state, "unknown") def _to_mb(value, unit) -> int: """转换为 MB""" unit = unit.lower() if unit in ("kib", "k", "kib"): return value // 1024 elif unit in ("mib", "m", "mib"): return value elif unit in ("gib", "g", "gib"): return value * 1024 elif unit in ("tib", "t"): return value * 1024 * 1024 elif unit == "b": return value // (1024 * 1024) return value // 1024 # default KiB