from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import or_ from typing import Optional import io from datetime import datetime from urllib.parse import quote from database import get_db from models import Inventory, TransactionLog from schemas import InventoryCreate, InventoryUpdate, StockOperation import openpyxl router = APIRouter() def inventory_to_dict(item: Inventory) -> dict: """将Inventory模型转为字典""" return { "id": item.id, "cInvCode": item.cInvCode, "supplier": item.supplier, "casing_label_remark": item.casing_label_remark, "batch": item.batch, "current_remaining": item.current_remaining, "storage_location": item.storage_location, "created_at": item.created_at.isoformat() if item.created_at else None, "updated_at": item.updated_at.isoformat() if item.updated_at else None, } def log_to_dict(log: TransactionLog) -> dict: """将TransactionLog模型转为字典""" return { "id": log.id, "inventory_id": log.inventory_id, "cInvCode": log.cInvCode, "type": log.type, "quantity": log.quantity, "remark": log.remark, "created_at": log.created_at.isoformat() if log.created_at else None, } # ===== 库存 CRUD ===== @router.get("/inventory") def list_inventory( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), search: Optional[str] = None, db: Session = Depends(get_db) ): """获取库存列表,支持分页和搜索""" query = db.query(Inventory) if search: keyword = f"%{search}%" query = query.filter( or_( Inventory.cInvCode.like(keyword), Inventory.supplier.like(keyword), Inventory.batch.like(keyword), Inventory.storage_location.like(keyword), Inventory.casing_label_remark.like(keyword), ) ) total = query.count() items = query.order_by(Inventory.id.asc()).offset((page - 1) * page_size).limit(page_size).all() return {"total": total, "items": [inventory_to_dict(i) for i in items]} @router.post("/inventory") def create_inventory(data: InventoryCreate, db: Session = Depends(get_db)): """新增库存""" item = Inventory(**data.model_dump()) db.add(item) db.commit() db.refresh(item) return inventory_to_dict(item) @router.put("/inventory/{item_id}") def update_inventory(item_id: int, data: InventoryUpdate, db: Session = Depends(get_db)): """更新库存""" item = db.query(Inventory).filter(Inventory.id == item_id).first() if not item: raise HTTPException(status_code=404, detail="库存记录不存在") update_data = data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(item, key, value) db.commit() db.refresh(item) return inventory_to_dict(item) @router.delete("/inventory/{item_id}") def delete_inventory(item_id: int, db: Session = Depends(get_db)): """删除库存""" item = db.query(Inventory).filter(Inventory.id == item_id).first() if not item: raise HTTPException(status_code=404, detail="库存记录不存在") db.delete(item) db.commit() return {"message": "删除成功"} # ===== 出入库 ===== @router.post("/stock/operation") def stock_operation(op: StockOperation, db: Session = Depends(get_db)): """出入库操作""" item = db.query(Inventory).filter(Inventory.id == op.inventory_id).first() if not item: raise HTTPException(status_code=404, detail="库存记录不存在") if op.type == "out": if item.current_remaining < op.quantity: raise HTTPException(status_code=400, detail=f"库存不足,当前剩余: {item.current_remaining}") item.current_remaining -= op.quantity elif op.type == "in": item.current_remaining += op.quantity else: raise HTTPException(status_code=400, detail="类型必须是 in 或 out") # 记录操作日志 log = TransactionLog( inventory_id=item.id, cInvCode=item.cInvCode, type=op.type, quantity=op.quantity, remark=op.remark, ) db.add(log) db.commit() db.refresh(item) return {"message": "操作成功", "current_remaining": item.current_remaining} @router.delete("/stock/logs") def clear_stock_logs(db: Session = Depends(get_db)): """清空所有出入库记录""" count = db.query(TransactionLog).delete() db.commit() return {"message": f"已清空 {count} 条出入库记录"} @router.get("/stock/logs") def get_stock_logs( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), search: Optional[str] = None, db: Session = Depends(get_db) ): """获取出入库记录""" query = db.query(TransactionLog) if search: keyword = f"%{search}%" query = query.filter( or_( TransactionLog.cInvCode.like(keyword), TransactionLog.remark.like(keyword), ) ) total = query.count() items = query.order_by(TransactionLog.id.asc()).offset((page - 1) * page_size).limit(page_size).all() return {"total": total, "items": [log_to_dict(l) for l in items]} # ===== Excel 导入导出 ===== @router.get("/inventory/export") def export_inventory(db: Session = Depends(get_db)): """导出库存为Excel""" items = db.query(Inventory).order_by(Inventory.id.asc()).all() wb = openpyxl.Workbook() ws = wb.active ws.title = "库存数据" # 表头 headers = ["序号", "产品编码", "供应商", "现外壳&标签&备注", "批次", "当前时间剩余", "存货地点"] ws.append(headers) # 表头样式 from openpyxl.styles import Font, Alignment, PatternFill, Border, Side header_font = Font(bold=True, size=12, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") header_alignment = Alignment(horizontal="center", vertical="center") thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) for col_idx, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col_idx, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment cell.border = thin_border # 数据行 for item in items: row_data = [ item.id, item.cInvCode, item.supplier or "", item.casing_label_remark or "", item.batch or "", item.current_remaining, item.storage_location or "", ] ws.append(row_data) # 调整列宽 col_widths = [8, 20, 20, 30, 15, 15, 20] for idx, width in enumerate(col_widths, 1): ws.column_dimensions[openpyxl.utils.get_column_letter(idx)].width = width # 保存到内存 output = io.BytesIO() wb.save(output) output.seek(0) filename = f"库存数据_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" filename_encoded = quote(filename) return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename={filename_encoded}; filename*=UTF-8''{filename_encoded}"} ) @router.post("/inventory/import") def import_inventory(file: UploadFile = File(...), db: Session = Depends(get_db)): """从Excel导入库存""" if not file.filename.endswith(('.xlsx', '.xls')): raise HTTPException(status_code=400, detail="只支持 .xlsx 或 .xls 文件") try: contents = file.file.read() wb = openpyxl.load_workbook(io.BytesIO(contents)) ws = wb.active imported = 0 updated = 0 errors = [] # 查找表头位置 header_map = {} expected_headers = { "序号": "id", "产品编码": "cInvCode", "供应商": "supplier", "现外壳&标签&备注": "casing_label_remark", "外壳&标签&备注": "casing_label_remark", "批次": "batch", "当前时间剩余": "current_remaining", "存货地点": "storage_location", } for row_idx, row in enumerate(ws.iter_rows(min_row=1, max_row=5), 1): for col_idx, cell in enumerate(row): if cell.value and str(cell.value).strip() in expected_headers: header_map[str(cell.value).strip()] = col_idx if not header_map.get("产品编码"): raise HTTPException(status_code=400, detail="未找到有效的表头行,请确保包含'产品编码'列") # 解析数据行(从第2行开始,假设第1行是表头) for row_idx, row in enumerate(ws.iter_rows(min_row=2), 2): try: # 先读取序号 id_col = header_map.get("序号") record_id = None if id_col is not None and id_col < len(row): id_val = row[id_col].value if id_val is not None: try: record_id = int(float(id_val)) except (ValueError, TypeError): record_id = None cInvCode_col = header_map.get("产品编码") if cInvCode_col is None: continue cInvCode = row[cInvCode_col].value if cInvCode_col < len(row) else None if not cInvCode: continue cInvCode = str(cInvCode).strip() def get_val(key, default=""): col = header_map.get(key) if col is not None and col < len(row): val = row[col].value return str(val).strip() if val is not None else default return default supplier = get_val("供应商") casing_label_remark = get_val("现外壳&标签&备注") or get_val("外壳&标签&备注") batch = get_val("批次") storage_location = get_val("存货地点") current_remaining_val = 0 cr_col = header_map.get("当前时间剩余") if cr_col is not None and cr_col < len(row): val = row[cr_col].value try: current_remaining_val = float(val) if val is not None else 0 except (ValueError, TypeError): current_remaining_val = 0 # 按序号优先,否则按产品编码+批次 if record_id: # 按序号查找 existing = db.query(Inventory).filter(Inventory.id == record_id).first() if existing: existing.cInvCode = cInvCode existing.supplier = supplier existing.casing_label_remark = casing_label_remark existing.batch = batch existing.current_remaining = current_remaining_val existing.storage_location = storage_location updated += 1 else: # 序号不存在,新增并指定ID new_item = Inventory( id=record_id, cInvCode=cInvCode, supplier=supplier, casing_label_remark=casing_label_remark, batch=batch, current_remaining=current_remaining_val, storage_location=storage_location, ) db.add(new_item) imported += 1 else: # 按产品编码+批次去重 existing = db.query(Inventory).filter( Inventory.cInvCode == cInvCode, Inventory.batch == batch ).first() if existing: existing.supplier = supplier or existing.supplier existing.casing_label_remark = casing_label_remark or existing.casing_label_remark existing.current_remaining = current_remaining_val if current_remaining_val else existing.current_remaining existing.storage_location = storage_location or existing.storage_location updated += 1 else: new_item = Inventory( cInvCode=cInvCode, supplier=supplier, casing_label_remark=casing_label_remark, batch=batch, current_remaining=current_remaining_val, storage_location=storage_location, ) db.add(new_item) imported += 1 except Exception as e: errors.append(f"第{row_idx}行: {str(e)}") continue db.commit() return { "message": f"导入完成:新增 {imported} 条,更新 {updated} 条", "imported": imported, "updated": updated, "errors": errors[:10], } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"导入失败: {str(e)}") # ===== 导出出入库记录 ===== @router.get("/stock/export") def export_stock_logs(db: Session = Depends(get_db)): """导出出入库记录为Excel""" items = db.query(TransactionLog).order_by(TransactionLog.id.desc()).all() wb = openpyxl.Workbook() ws = wb.active ws.title = "出入库记录" headers = ["序号", "产品编码", "类型", "数量", "备注", "操作时间"] ws.append(headers) from openpyxl.styles import Font, Alignment, PatternFill, Border, Side header_font = Font(bold=True, size=12, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") header_alignment = Alignment(horizontal="center", vertical="center") thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) for col_idx, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col_idx, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment cell.border = thin_border for item in items: ws.append([ item.id, item.cInvCode, "入库" if item.type == "in" else "出库", item.quantity, item.remark or "", item.created_at.strftime("%Y-%m-%d %H:%M:%S") if item.created_at else "", ]) col_widths = [8, 20, 10, 10, 30, 20] for idx, width in enumerate(col_widths, 1): ws.column_dimensions[openpyxl.utils.get_column_letter(idx)].width = width output = io.BytesIO() wb.save(output) output.seek(0) filename = f"出入库记录_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" filename_encoded = quote(filename) return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename={filename_encoded}; filename*=UTF-8''{filename_encoded}"} )