1
0
Ficheiros

535 linhas
19 KiB
Python

"""虚拟机管理路由"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import Optional, List
from lxml import etree
from app.hosts import list_hosts
import os
from app.libvirt_conn import conn_pool
from app.utils import parse_vm_info, generate_vm_xml
from app.routers.monitor import get_vm_runtime_stats
import libvirt
router = APIRouter()
# ===== 请求模型 =====
class VMCreate(BaseModel):
name: str = Field(..., description="虚拟机名称")
memory_mb: int = Field(2048, description="内存大小(MB)")
vcpus: int = Field(2, description="CPU核心数")
disk_gb: int = Field(20, description="磁盘大小(GB)")
pool_name: str = Field("default", description="存储池名称")
iso_path: Optional[str] = Field(None, description="ISO安装镜像路径")
network: str = Field("default", description="网络名称")
description: Optional[str] = Field(None, description="描述")
class VMAction(BaseModel):
action: str = Field(..., description="操作: start/stop/restart/pause/resume/force_stop")
class VMClone(BaseModel):
new_name: str = Field(..., description="新虚拟机名称")
# ===== API =====
@router.get("/list")
async def list_vms(host_id: str = Query("local"), include_ip: bool = False):
"""获取指定主机所有虚拟机列表(轻量模式,默认不获取IP)"""
conn = conn_pool.get_conn(host_id)
domains = conn.listAllDomains(0)
vms = []
for dom in domains:
try:
vm_info = parse_vm_info(dom, host_id, include_ip=include_ip)
# 添加运行时统计(CPU使用率、内存使用量)
cache_key = f"{host_id}_{vm_info['name']}"
runtime = get_vm_runtime_stats(dom, cache_key)
vm_info["cpu_percent"] = runtime["cpu_percent"]
vm_info["memory_rss_mb"] = runtime["memory"].get("rss_mb", 0)
vm_info["memory_usage_percent"] = runtime["memory"].get("usage_percent", 0)
vms.append(vm_info)
except Exception as e:
vms.append({
"name": dom.name(),
"uuid": dom.UUIDString(),
"state": "error",
"error": str(e),
})
return {"vms": vms, "total": len(vms), "host_id": host_id}
@router.get("/list-all")
async def list_all_vms(include_ip: bool = False):
"""获取所有主机所有虚拟机列表(聚合模式)"""
hosts = list_hosts()
all_vms = []
for host in hosts:
try:
conn = conn_pool.get_conn(host.id)
domains = conn.listAllDomains(0)
for dom in domains:
try:
vm_info = parse_vm_info(dom, host.id, include_ip=include_ip)
vm_info["host_id"] = host.id
vm_info["host_name"] = host.name
# 添加运行时统计
cache_key = f"{host.id}_{vm_info['name']}"
runtime = get_vm_runtime_stats(dom, cache_key)
vm_info["cpu_percent"] = runtime["cpu_percent"]
vm_info["memory_rss_mb"] = runtime["memory"].get("rss_mb", 0)
vm_info["memory_usage_percent"] = runtime["memory"].get("usage_percent", 0)
all_vms.append(vm_info)
except Exception:
all_vms.append({
"name": dom.name(),
"uuid": dom.UUIDString(),
"state": "error",
"host_id": host.id,
"host_name": host.name,
})
except Exception:
pass # 跳过无法连接的主机
return {"vms": all_vms, "total": len(all_vms), "host_count": len(hosts)}
@router.get("/detail/{name}")
async def get_vm_detail(name: str, host_id: str = Query("local")):
"""获取虚拟机详情"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
info = parse_vm_info(dom, host_id)
# 运行中的虚拟机获取更多动态信息
if info["state"] == "running":
try:
# dom.info(): [state, maxMem, memory, nrVirtCpu, cpuTime]
dom_info = dom.info()
info["cpu_time_ns"] = dom_info[4] # cpuTime 在索引4
except Exception:
pass
try:
mem_stats = dom.memoryStats()
info["memory_stats"] = mem_stats
except Exception:
pass
try:
block_stats = []
for disk in info.get("disks", []):
if disk.get("dev"):
stats = dom.blockStats(disk["dev"])
block_stats.append({
"dev": disk["dev"],
"read_bytes": stats[1],
"write_bytes": stats[3],
"read_requests": stats[0],
"write_requests": stats[2],
})
info["block_stats"] = block_stats
except Exception:
pass
try:
net_stats = []
for i, iface in enumerate(info.get("interfaces", [])):
if iface.get("mac"):
stats = dom.interfaceStats(iface["mac"])
net_stats.append({
"mac": iface["mac"],
"rx_bytes": stats[0],
"tx_bytes": stats[4],
"rx_packets": stats[1],
"tx_packets": stats[5],
})
info["net_stats"] = net_stats
except Exception:
pass
return info
@router.get("/ip/{name}")
async def get_vm_ip(name: str, host_id: str = Query("local")):
"""获取虚拟机 IP 地址"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
if not dom.isActive():
return {"name": name, "ips": [], "message": "虚拟机未运行"}
xml_desc = dom.XMLDesc(0)
tree = etree.fromstring(xml_desc.encode())
interfaces = []
for iface in tree.findall(".//interface"):
source = iface.find("source")
mac_elem = iface.find("mac")
if mac_elem is None:
continue
mac = mac_elem.get("address", "")
network = source.get("network", "") if source is not None else ""
ips = []
# 方式1: QEMU Guest Agent
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_AGENT, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
if ifdata.get("hwaddr", "").lower() == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
if "." in addr:
ips.append({"ip": addr, "type": "ipv4", "source": "guest-agent"})
elif ":" in addr:
ips.append({"ip": addr, "type": "ipv6", "source": "guest-agent"})
except Exception:
pass
# 方式2: DHCP 租约
if not ips:
try:
ifaces = dom.interfaceAddresses(
libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE, 0
)
if ifaces:
for ifname, ifdata in ifaces.items():
hwaddr = ifdata.get("hwaddr", "").lower()
if hwaddr == mac.lower():
addrs = ifdata.get("addrs", [])
for a in addrs:
addr = a.get("addr", "")
if "." in addr:
ips.append({"ip": addr, "type": "ipv4", "source": "dhcp-lease"})
except Exception:
pass
# 方式3: ARP 表查找
if not ips:
ip_list = _arp_lookup(mac, host_id)
for ip in ip_list:
ips.append({"ip": ip, "type": "ipv4", "source": "arp"})
interfaces.append({
"mac": mac,
"network": network,
"ips": ips,
})
return {"name": name, "interfaces": interfaces}
def _arp_lookup(mac: str, host_id: str = "local") -> list:
"""通过 ARP 表查找 MAC 对应的 IP"""
import subprocess
from app.hosts import get_host as get_host_info
host_info = get_host_info(host_id)
is_remote = host_info and host_info.type != "local"
try:
if is_remote and host_info.type == "ssh":
from urllib.parse import urlparse
parsed = urlparse(host_info.uri)
remote_host = parsed.hostname
ssh_args = ["ssh", "-o", "StrictHostKeyChecking=no"]
if host_info.ssh_key_path:
ssh_args.extend(["-i", host_info.ssh_key_path])
ssh_args.extend([remote_host, "ip", "neigh", "show"])
result = subprocess.run(ssh_args, capture_output=True, text=True, timeout=5)
else:
result = subprocess.run(
["ip", "neigh", "show"],
capture_output=True, text=True, timeout=3,
)
if result.returncode == 0:
found = []
for line in result.stdout.strip().split("\n"):
if mac.lower() in line.lower():
parts = line.split()
if parts and "." in parts[0]:
found.append(parts[0])
return found
except Exception:
pass
return []
@router.post("/create")
async def create_vm(vm: VMCreate, host_id: str = Query("local")):
"""创建虚拟机"""
conn = conn_pool.get_conn(host_id)
# 检查名称是否已存在
try:
conn.lookupByName(vm.name)
raise HTTPException(status_code=400, detail=f"虚拟机 '{vm.name}' 已存在")
except libvirt.libvirtError:
pass
with conn_pool.get_rw(host_id) as rw_conn:
try:
# 确定磁盘路径
pool = rw_conn.storagePoolLookupByName(vm.pool_name)
pool_info = pool.info()
pool_xml = etree.fromstring(pool.XMLDesc(0).encode())
target = pool_xml.find(".//target/path")
pool_path = target.text if target is not None else "/var/lib/libvirt/images"
disk_path = os.path.join(pool_path, f"{vm.name}.qcow2")
# 创建 qcow2 磁盘
vol_xml = f"""<volume>
<name>{vm.name}.qcow2</name>
<capacity unit='GiB'>{vm.disk_gb}</capacity>
<allocation unit='GiB'>1</allocation>
<target>
<format type='qcow2'/>
<permissions>
<mode>0644</mode>
</permissions>
</target>
</volume>"""
pool.createXML(vol_xml, 0)
# 生成虚拟机XML
vm_xml = generate_vm_xml(
name=vm.name,
memory_mb=vm.memory_mb,
vcpus=vm.vcpus,
disk_path=disk_path,
disk_size_gb=vm.disk_gb,
iso_path=vm.iso_path,
network=vm.network,
)
# 定义并启动
dom = rw_conn.defineXML(vm_xml)
dom.create()
return {"message": f"虚拟机 '{vm.name}' 创建成功", "name": vm.name}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"创建虚拟机失败: {str(e)}")
@router.post("/action/{name}")
async def vm_action(name: str, action: VMAction, host_id: str = Query("local")):
"""虚拟机操作"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
with conn_pool.get_rw(host_id) as rw_conn:
try:
rw_dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
act = action.action
try:
if act == "start":
rw_dom.create()
msg = f"虚拟机 '{name}' 已启动"
elif act == "stop":
rw_dom.shutdown()
msg = f"虚拟机 '{name}' 正在关闭"
elif act == "force_stop":
rw_dom.destroy()
msg = f"虚拟机 '{name}' 已强制关闭"
elif act == "restart":
rw_dom.reboot(libvirt.VIR_DOMAIN_REBOOT_DEFAULT)
msg = f"虚拟机 '{name}' 正在重启"
elif act == "pause":
rw_dom.suspend()
msg = f"虚拟机 '{name}' 已暂停"
elif act == "resume":
rw_dom.resume()
msg = f"虚拟机 '{name}' 已恢复"
else:
raise HTTPException(status_code=400, detail=f"不支持的操作: {act}")
return {"message": msg}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"操作失败: {str(e)}")
@router.delete("/delete/{name}")
async def delete_vm(name: str, force: bool = False, host_id: str = Query("local")):
"""删除虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
state, _ = dom.info()[0:2]
if state == libvirt.VIR_DOMAIN_RUNNING:
if force:
dom.destroy()
else:
raise HTTPException(
status_code=400,
detail=f"虚拟机 '{name}' 正在运行,请先关闭或使用 force=true"
)
# 获取磁盘路径
xml_desc = dom.XMLDesc(0)
tree = etree.fromstring(xml_desc.encode())
disk_files = []
for disk in tree.findall(".//disk[@device='disk']/source"):
f = disk.get("file")
if f:
disk_files.append(f)
# 取消定义
dom.undefine()
# 删除磁盘文件
for f in disk_files:
try:
os.remove(f)
except Exception:
pass
return {"message": f"虚拟机 '{name}' 已删除", "removed_disks": disk_files}
@router.post("/clone/{name}")
async def clone_vm(name: str, clone: VMClone, host_id: str = Query("local")):
"""克隆虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
# 检查新名称是否已存在
try:
rw_conn.lookupByName(clone.new_name)
raise HTTPException(status_code=400, detail=f"虚拟机 '{clone.new_name}' 已存在")
except libvirt.libvirtError:
pass
try:
xml_desc = dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)
tree = etree.fromstring(xml_desc.encode())
tree.find("name").text = clone.new_name
uuid_elem = tree.find("uuid")
if uuid_elem is not None:
tree.remove(uuid_elem)
import uuid as uuid_mod
new_uuid = str(uuid_mod.uuid4())[:8]
for disk in tree.findall(".//disk[@device='disk']"):
source = disk.find("source")
if source is not None:
old_path = source.get("file", "")
new_path = old_path.replace(f"{name}.qcow2", f"{clone.new_name}.qcow2")
source.set("file", new_path)
for mac in tree.findall(".//interface/mac"):
import random
mac_addr = "52:54:00:%02x:%02x:%02x" % (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
)
mac.set("address", mac_addr)
old_disk_path = ""
new_disk_path = ""
for disk in tree.findall(".//disk[@device='disk']/source"):
old_disk_path = disk.get("file", "")
new_disk_path = old_disk_path.replace(f"{name}.qcow2", f"{clone.new_name}.qcow2")
if old_disk_path and os.path.exists(old_disk_path):
import subprocess
subprocess.run(
["qemu-img", "create", "-f", "qcow2", "-b", old_disk_path, "-F", "qcow2", new_disk_path],
check=True,
capture_output=True,
)
new_xml = etree.tostring(tree, encoding="unicode")
rw_conn.defineXML(new_xml)
return {"message": f"虚拟机 '{name}' 已克隆为 '{clone.new_name}'"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"克隆失败: {str(e)}")
@router.get("/xml/{name}")
async def get_vm_xml(name: str, host_id: str = Query("local")):
"""获取虚拟机 XML 配置"""
conn = conn_pool.get_conn(host_id)
try:
dom = conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
return {"name": name, "xml": dom.XMLDesc(libvirt.VIR_DOMAIN_XML_SECURE)}
@router.put("/xml/{name}")
async def update_vm_xml(name: str, xml: dict, host_id: str = Query("local")):
"""更新虚拟机 XML 配置"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
xml_str = xml.get("xml", "")
if not xml_str:
raise HTTPException(status_code=400, detail="XML不能为空")
try:
rw_conn.defineXML(xml_str)
return {"message": f"虚拟机 '{name}' 配置已更新"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
@router.post("/migrate/{name}")
async def migrate_vm(name: str, dest_uri: str, live: bool = True, host_id: str = Query("local")):
"""迁移虚拟机"""
with conn_pool.get_rw(host_id) as rw_conn:
try:
dom = rw_conn.lookupByName(name)
except libvirt.libvirtError:
raise HTTPException(status_code=404, detail=f"虚拟机 '{name}' 不存在")
flags = libvirt.VIR_MIGRATE_LIVE if live else 0
flags |= libvirt.VIR_MIGRATE_PERSIST_DEST
try:
dest_conn = libvirt.open(dest_uri)
dom.migrate(dest_conn, flags, None, None, 0)
return {"message": f"虚拟机 '{name}' 已迁移到 {dest_uri}"}
except libvirt.libvirtError as e:
raise HTTPException(status_code=500, detail=f"迁移失败: {str(e)}")