"""虚拟机 XML 模板和工具函数""" import uuid import libvirt from lxml import etree def generate_vm_xml( name: str, memory_mb: int, vcpus: int, disk_path: str, disk_size_gb: int = 20, iso_path: str = None, network: str = "default", vnc_port: int = -1, os_type: str = "hvm", arch: str = "x86_64", machine: str = "pc", ) -> str: """生成虚拟机 XML 定义""" vm_uuid = str(uuid.uuid4()) # 基础 XML 结构 xml_parts = f""" {name} {vm_uuid} {memory_mb} {memory_mb} {vcpus} {os_type} destroy restart destroy /usr/bin/qemu-system-x86_64 """ # 光驱(ISO安装) if iso_path: xml_parts += f""" """ # VNC if vnc_port == -1: vnc_port = 5900 # auto-allocate by libvirt xml_parts += f""" """ # 网络 xml_parts += f""" """ return xml_parts def parse_vm_info(dom, host_id: str = "local", include_ip: bool = False) -> dict: """从 libvirt domain 对象提取虚拟机信息 Args: dom: libvirt domain 对象 host_id: 主机ID include_ip: 是否获取IP地址(开启会明显变慢,默认关闭) """ from app.libvirt_conn import libvirt_conn # 基本信息 info = { "id": dom.ID(), "name": dom.name(), "uuid": dom.UUIDString(), "state": _get_state(dom), "autostart": False, } try: info["autostart"] = dom.autostart() == 1 except Exception: pass # 解析 XML xml_desc = dom.XMLDesc(0) tree = etree.fromstring(xml_desc.encode()) # 内存和CPU mem = tree.find(".//memory") cur_mem = tree.find(".//currentMemory") vcpu = tree.find(".//vcpu") mem_unit = mem.get("unit", "KiB") if mem is not None else "KiB" mem_val = int(mem.text) if mem is not None else 0 info["memory_mb"] = _to_mb(mem_val, mem_unit) cur_unit = cur_mem.get("unit", "KiB") if cur_mem is not None else "KiB" cur_val = int(cur_mem.text) if cur_mem is not None else 0 info["current_memory_mb"] = _to_mb(cur_val, cur_unit) info["vcpus"] = int(vcpu.text) if vcpu is not None else 1 # CPU type cpu = tree.find(".//cpu") if cpu is not None: info["cpu_mode"] = cpu.get("mode", "unknown") else: info["cpu_mode"] = "unknown" # 磁盘 disks = [] for disk in tree.findall(".//disk"): if disk.get("device") == "disk": source = disk.find("source") target = disk.find("target") driver = disk.find("driver") disk_info = { "file": source.get("file", "") if source is not None else "", "dev": target.get("dev", "") if target is not None else "", "bus": target.get("bus", "") if target is not None else "", "format": driver.get("type", "") if driver is not None else "", } disks.append(disk_info) info["disks"] = disks # 网络 interfaces = [] for iface in tree.findall(".//interface"): source = iface.find("source") model = iface.find("model") target = iface.find("target") iface_info = { "type": iface.get("type", ""), "network": source.get("network", "") if source is not None else "", "bridge": source.get("bridge", "") if source is not None else "", "model": model.get("type", "") if model is not None else "", "dev": target.get("dev", "") if target is not None else "", } mac_elem = iface.find("mac") if mac_elem is not None: iface_info["mac"] = mac_elem.get("address", "") interfaces.append(iface_info) info["interfaces"] = interfaces # 获取 IP 地址(仅在需要时) if include_ip and info["state"] == "running": arp_cache = _get_arp_table(host_id) for iface in interfaces: if "mac" in iface: # 先尝试 Guest Agent 和 DHCP 租约 iface["ip"] = _get_vm_ip(dom, iface["mac"], iface.get("network", ""), host_id, None) # 如果没找到再用 ARP 表 if not iface.get("ip") and arp_cache: mac_lower = iface["mac"].lower() if mac_lower in arp_cache: iface["ip"] = arp_cache[mac_lower] else: # 不获取 IP 时,初始化 ip 字段为空 for iface in interfaces: iface["ip"] = "" # VNC graphics = tree.find(".//graphics[@type='vnc']") if graphics is not None: info["vnc_port"] = int(graphics.get("port", -1)) info["vnc_listen"] = graphics.get("listen", "127.0.0.1") else: info["vnc_port"] = -1 info["vnc_listen"] = "" # OS info os_type = tree.find(".//os/type") info["os_type"] = os_type.text if os_type is not None else "hvm" return info def _get_arp_table(host_id: str = "local") -> dict: """获取 ARP 表(MAC -> IP 映射),返回 {mac_lower: ip}""" import subprocess from app.hosts import get_host as get_host_info host_info = get_host_info(host_id) is_remote = host_info and host_info.type != "local" try: if is_remote and host_info.type == "ssh": from urllib.parse import urlparse parsed = urlparse(host_info.uri) remote_host = parsed.hostname ssh_args = ["ssh", "-o", "StrictHostKeyChecking=no"] if host_info.ssh_key_path: ssh_args.extend(["-i", host_info.ssh_key_path]) ssh_args.extend([remote_host, "ip", "neigh", "show"]) result = subprocess.run(ssh_args, capture_output=True, text=True, timeout=5) else: result = subprocess.run( ["ip", "neigh", "show"], capture_output=True, text=True, timeout=3, ) if result.returncode == 0: arp = {} for line in result.stdout.strip().split("\n"): parts = line.split() if len(parts) >= 5: ip = parts[0] # lladdr 行格式: IP dev IFACE lladdr MAC ... for i, p in enumerate(parts): if p == "lladdr" and i + 1 < len(parts) and "." in ip: arp[parts[i + 1].lower()] = ip return arp except Exception: pass return {} def _get_vm_ip(dom, mac: str, network: str = "", host_id: str = "local", arp_cache: dict = None) -> str: """多种方式尝试获取虚拟机 IP 地址""" # 方式1: QEMU Guest Agent(最准确) try: ifaces = dom.interfaceAddresses( libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_AGENT, 0 ) if ifaces: for ifname, ifdata in ifaces.items(): if ifdata.get("hwaddr", "").lower() == mac.lower(): addrs = ifdata.get("addrs", []) for a in addrs: addr = a.get("addr", "") # 优先返回 IPv4 if "." in addr: return addr if addrs: return addrs[0].get("addr", "") except Exception: pass # 方式2: DHCP 租约(通过 libvirt 网络) try: ifaces = dom.interfaceAddresses( libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE, 0 ) if ifaces: for ifname, ifdata in ifaces.items(): hwaddr = ifdata.get("hwaddr", "").lower() if hwaddr == mac.lower(): addrs = ifdata.get("addrs", []) for a in addrs: addr = a.get("addr", "") if "." in addr: return addr if addrs: return addrs[0].get("addr", "") except Exception: pass # 方式3: ARP 表查找(通过 MAC 地址查 IP) if arp_cache is not None and mac.lower() in arp_cache: return arp_cache[mac.lower()] return "" def _get_state(dom) -> str: """获取虚拟机运行状态""" raw = dom.info() state = raw[0] state_map = { libvirt.VIR_DOMAIN_NOSTATE: "nostate", libvirt.VIR_DOMAIN_RUNNING: "running", libvirt.VIR_DOMAIN_BLOCKED: "blocked", libvirt.VIR_DOMAIN_PAUSED: "paused", libvirt.VIR_DOMAIN_SHUTDOWN: "shutdown", libvirt.VIR_DOMAIN_SHUTOFF: "shutoff", libvirt.VIR_DOMAIN_CRASHED: "crashed", libvirt.VIR_DOMAIN_PMSUSPENDED: "suspended", } return state_map.get(state, "unknown") def _to_mb(value, unit) -> int: """转换为 MB""" unit = unit.lower() if unit in ("kib", "k", "kib"): return value // 1024 elif unit in ("mib", "m", "mib"): return value elif unit in ("gib", "g", "gib"): return value * 1024 elif unit in ("tib", "t"): return value * 1024 * 1024 elif unit == "b": return value // (1024 * 1024) return value // 1024 # default KiB