| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, Query
- from fastapi.responses import StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import or_
- from typing import Optional
- import io
- from datetime import datetime
- from urllib.parse import quote
- from database import get_db
- from models import Inventory, TransactionLog
- from schemas import InventoryCreate, InventoryUpdate, StockOperation
- import openpyxl
- router = APIRouter()
- def inventory_to_dict(item: Inventory) -> dict:
- """将Inventory模型转为字典"""
- return {
- "id": item.id,
- "cInvCode": item.cInvCode,
- "supplier": item.supplier,
- "casing_label_remark": item.casing_label_remark,
- "batch": item.batch,
- "current_remaining": item.current_remaining,
- "storage_location": item.storage_location,
- "created_at": item.created_at.isoformat() if item.created_at else None,
- "updated_at": item.updated_at.isoformat() if item.updated_at else None,
- }
- def log_to_dict(log: TransactionLog) -> dict:
- """将TransactionLog模型转为字典"""
- return {
- "id": log.id,
- "inventory_id": log.inventory_id,
- "cInvCode": log.cInvCode,
- "type": log.type,
- "quantity": log.quantity,
- "remark": log.remark,
- "created_at": log.created_at.isoformat() if log.created_at else None,
- }
- # ===== 库存 CRUD =====
- @router.get("/inventory")
- def list_inventory(
- page: int = Query(1, ge=1),
- page_size: int = Query(20, ge=1, le=100),
- search: Optional[str] = None,
- db: Session = Depends(get_db)
- ):
- """获取库存列表,支持分页和搜索"""
- query = db.query(Inventory)
- if search:
- keyword = f"%{search}%"
- query = query.filter(
- or_(
- Inventory.cInvCode.like(keyword),
- Inventory.supplier.like(keyword),
- Inventory.batch.like(keyword),
- Inventory.storage_location.like(keyword),
- Inventory.casing_label_remark.like(keyword),
- )
- )
- total = query.count()
- items = query.order_by(Inventory.id.asc()).offset((page - 1) * page_size).limit(page_size).all()
- return {"total": total, "items": [inventory_to_dict(i) for i in items]}
- @router.post("/inventory")
- def create_inventory(data: InventoryCreate, db: Session = Depends(get_db)):
- """新增库存"""
- item = Inventory(**data.model_dump())
- db.add(item)
- db.commit()
- db.refresh(item)
- return inventory_to_dict(item)
- @router.put("/inventory/{item_id}")
- def update_inventory(item_id: int, data: InventoryUpdate, db: Session = Depends(get_db)):
- """更新库存"""
- item = db.query(Inventory).filter(Inventory.id == item_id).first()
- if not item:
- raise HTTPException(status_code=404, detail="库存记录不存在")
- update_data = data.model_dump(exclude_unset=True)
- for key, value in update_data.items():
- setattr(item, key, value)
- db.commit()
- db.refresh(item)
- return inventory_to_dict(item)
- @router.delete("/inventory/{item_id}")
- def delete_inventory(item_id: int, db: Session = Depends(get_db)):
- """删除库存"""
- item = db.query(Inventory).filter(Inventory.id == item_id).first()
- if not item:
- raise HTTPException(status_code=404, detail="库存记录不存在")
- db.delete(item)
- db.commit()
- return {"message": "删除成功"}
- # ===== 出入库 =====
- @router.post("/stock/operation")
- def stock_operation(op: StockOperation, db: Session = Depends(get_db)):
- """出入库操作"""
- item = db.query(Inventory).filter(Inventory.id == op.inventory_id).first()
- if not item:
- raise HTTPException(status_code=404, detail="库存记录不存在")
- if op.type == "out":
- if item.current_remaining < op.quantity:
- raise HTTPException(status_code=400, detail=f"库存不足,当前剩余: {item.current_remaining}")
- item.current_remaining -= op.quantity
- elif op.type == "in":
- item.current_remaining += op.quantity
- else:
- raise HTTPException(status_code=400, detail="类型必须是 in 或 out")
- # 记录操作日志
- log = TransactionLog(
- inventory_id=item.id,
- cInvCode=item.cInvCode,
- type=op.type,
- quantity=op.quantity,
- remark=op.remark,
- )
- db.add(log)
- db.commit()
- db.refresh(item)
- return {"message": "操作成功", "current_remaining": item.current_remaining}
- @router.delete("/stock/logs")
- def clear_stock_logs(db: Session = Depends(get_db)):
- """清空所有出入库记录"""
- count = db.query(TransactionLog).delete()
- db.commit()
- return {"message": f"已清空 {count} 条出入库记录"}
- @router.get("/stock/logs")
- def get_stock_logs(
- page: int = Query(1, ge=1),
- page_size: int = Query(20, ge=1, le=100),
- search: Optional[str] = None,
- db: Session = Depends(get_db)
- ):
- """获取出入库记录"""
- query = db.query(TransactionLog)
- if search:
- keyword = f"%{search}%"
- query = query.filter(
- or_(
- TransactionLog.cInvCode.like(keyword),
- TransactionLog.remark.like(keyword),
- )
- )
- total = query.count()
- items = query.order_by(TransactionLog.id.asc()).offset((page - 1) * page_size).limit(page_size).all()
- return {"total": total, "items": [log_to_dict(l) for l in items]}
- # ===== Excel 导入导出 =====
- @router.get("/inventory/export")
- def export_inventory(db: Session = Depends(get_db)):
- """导出库存为Excel"""
- items = db.query(Inventory).order_by(Inventory.id.asc()).all()
- wb = openpyxl.Workbook()
- ws = wb.active
- ws.title = "库存数据"
- # 表头
- headers = ["序号", "产品编码", "供应商", "现外壳&标签&备注", "批次", "当前时间剩余", "存货地点"]
- ws.append(headers)
- # 表头样式
- from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
- header_font = Font(bold=True, size=12, color="FFFFFF")
- header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
- header_alignment = Alignment(horizontal="center", vertical="center")
- thin_border = Border(
- left=Side(style='thin'), right=Side(style='thin'),
- top=Side(style='thin'), bottom=Side(style='thin')
- )
- for col_idx, header in enumerate(headers, 1):
- cell = ws.cell(row=1, column=col_idx, value=header)
- cell.font = header_font
- cell.fill = header_fill
- cell.alignment = header_alignment
- cell.border = thin_border
- # 数据行
- for item in items:
- row_data = [
- item.id,
- item.cInvCode,
- item.supplier or "",
- item.casing_label_remark or "",
- item.batch or "",
- item.current_remaining,
- item.storage_location or "",
- ]
- ws.append(row_data)
- # 调整列宽
- col_widths = [8, 20, 20, 30, 15, 15, 20]
- for idx, width in enumerate(col_widths, 1):
- ws.column_dimensions[openpyxl.utils.get_column_letter(idx)].width = width
- # 保存到内存
- output = io.BytesIO()
- wb.save(output)
- output.seek(0)
- filename = f"库存数据_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
- filename_encoded = quote(filename)
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename_encoded}; filename*=UTF-8''{filename_encoded}"}
- )
- @router.post("/inventory/import")
- def import_inventory(file: UploadFile = File(...), db: Session = Depends(get_db)):
- """从Excel导入库存"""
- if not file.filename.endswith(('.xlsx', '.xls')):
- raise HTTPException(status_code=400, detail="只支持 .xlsx 或 .xls 文件")
- try:
- contents = file.file.read()
- wb = openpyxl.load_workbook(io.BytesIO(contents))
- ws = wb.active
- imported = 0
- updated = 0
- errors = []
- # 查找表头位置
- header_map = {}
- expected_headers = {
- "序号": "id",
- "产品编码": "cInvCode",
- "供应商": "supplier",
- "现外壳&标签&备注": "casing_label_remark",
- "外壳&标签&备注": "casing_label_remark",
- "批次": "batch",
- "当前时间剩余": "current_remaining",
- "存货地点": "storage_location",
- }
- for row_idx, row in enumerate(ws.iter_rows(min_row=1, max_row=5), 1):
- for col_idx, cell in enumerate(row):
- if cell.value and str(cell.value).strip() in expected_headers:
- header_map[str(cell.value).strip()] = col_idx
- if not header_map.get("产品编码"):
- raise HTTPException(status_code=400, detail="未找到有效的表头行,请确保包含'产品编码'列")
- # 解析数据行(从第2行开始,假设第1行是表头)
- for row_idx, row in enumerate(ws.iter_rows(min_row=2), 2):
- try:
- # 先读取序号
- id_col = header_map.get("序号")
- record_id = None
- if id_col is not None and id_col < len(row):
- id_val = row[id_col].value
- if id_val is not None:
- try:
- record_id = int(float(id_val))
- except (ValueError, TypeError):
- record_id = None
- cInvCode_col = header_map.get("产品编码")
- if cInvCode_col is None:
- continue
- cInvCode = row[cInvCode_col].value if cInvCode_col < len(row) else None
- if not cInvCode:
- continue
- cInvCode = str(cInvCode).strip()
- def get_val(key, default=""):
- col = header_map.get(key)
- if col is not None and col < len(row):
- val = row[col].value
- return str(val).strip() if val is not None else default
- return default
- supplier = get_val("供应商")
- casing_label_remark = get_val("现外壳&标签&备注") or get_val("外壳&标签&备注")
- batch = get_val("批次")
- storage_location = get_val("存货地点")
- current_remaining_val = 0
- cr_col = header_map.get("当前时间剩余")
- if cr_col is not None and cr_col < len(row):
- val = row[cr_col].value
- try:
- current_remaining_val = float(val) if val is not None else 0
- except (ValueError, TypeError):
- current_remaining_val = 0
- # 按序号优先,否则按产品编码+批次
- if record_id:
- # 按序号查找
- existing = db.query(Inventory).filter(Inventory.id == record_id).first()
- if existing:
- existing.cInvCode = cInvCode
- existing.supplier = supplier
- existing.casing_label_remark = casing_label_remark
- existing.batch = batch
- existing.current_remaining = current_remaining_val
- existing.storage_location = storage_location
- updated += 1
- else:
- # 序号不存在,新增并指定ID
- new_item = Inventory(
- id=record_id,
- cInvCode=cInvCode,
- supplier=supplier,
- casing_label_remark=casing_label_remark,
- batch=batch,
- current_remaining=current_remaining_val,
- storage_location=storage_location,
- )
- db.add(new_item)
- imported += 1
- else:
- # 按产品编码+批次去重
- existing = db.query(Inventory).filter(
- Inventory.cInvCode == cInvCode,
- Inventory.batch == batch
- ).first()
- if existing:
- existing.supplier = supplier or existing.supplier
- existing.casing_label_remark = casing_label_remark or existing.casing_label_remark
- existing.current_remaining = current_remaining_val if current_remaining_val else existing.current_remaining
- existing.storage_location = storage_location or existing.storage_location
- updated += 1
- else:
- new_item = Inventory(
- cInvCode=cInvCode,
- supplier=supplier,
- casing_label_remark=casing_label_remark,
- batch=batch,
- current_remaining=current_remaining_val,
- storage_location=storage_location,
- )
- db.add(new_item)
- imported += 1
- except Exception as e:
- errors.append(f"第{row_idx}行: {str(e)}")
- continue
- db.commit()
- return {
- "message": f"导入完成:新增 {imported} 条,更新 {updated} 条",
- "imported": imported,
- "updated": updated,
- "errors": errors[:10],
- }
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=f"导入失败: {str(e)}")
- # ===== 导出出入库记录 =====
- @router.get("/stock/export")
- def export_stock_logs(db: Session = Depends(get_db)):
- """导出出入库记录为Excel"""
- items = db.query(TransactionLog).order_by(TransactionLog.id.desc()).all()
- wb = openpyxl.Workbook()
- ws = wb.active
- ws.title = "出入库记录"
- headers = ["序号", "产品编码", "类型", "数量", "备注", "操作时间"]
- ws.append(headers)
- from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
- header_font = Font(bold=True, size=12, color="FFFFFF")
- header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
- header_alignment = Alignment(horizontal="center", vertical="center")
- thin_border = Border(
- left=Side(style='thin'), right=Side(style='thin'),
- top=Side(style='thin'), bottom=Side(style='thin')
- )
- for col_idx, header in enumerate(headers, 1):
- cell = ws.cell(row=1, column=col_idx, value=header)
- cell.font = header_font
- cell.fill = header_fill
- cell.alignment = header_alignment
- cell.border = thin_border
- for item in items:
- ws.append([
- item.id,
- item.cInvCode,
- "入库" if item.type == "in" else "出库",
- item.quantity,
- item.remark or "",
- item.created_at.strftime("%Y-%m-%d %H:%M:%S") if item.created_at else "",
- ])
- col_widths = [8, 20, 10, 10, 30, 20]
- for idx, width in enumerate(col_widths, 1):
- ws.column_dimensions[openpyxl.utils.get_column_letter(idx)].width = width
- output = io.BytesIO()
- wb.save(output)
- output.seek(0)
- filename = f"出入库记录_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
- filename_encoded = quote(filename)
- return StreamingResponse(
- output,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- headers={"Content-Disposition": f"attachment; filename={filename_encoded}; filename*=UTF-8''{filename_encoded}"}
- )
|