"""Barcode mapping CRUD endpoints.""" import json from pathlib import Path from typing import Dict, Optional, List from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel from ..auth.dependencies import get_current_user router = APIRouter(prefix="/api/barcodes", tags=["barcodes"]) _project_root = Path(__file__).resolve().parent.parent.parent.parent _mappings_file = _project_root / "config" / "barcode_mappings.json" class BarcodeMapping(BaseModel): barcode: str target: str description: Optional[str] = None class BarcodeMappingUpdate(BaseModel): target: Optional[str] = None description: Optional[str] = None def _load_mappings() -> Dict: if not _mappings_file.is_file(): return {} try: return json.loads(_mappings_file.read_text(encoding="utf-8")) except Exception: return {} def _save_mappings(data: Dict): _mappings_file.parent.mkdir(parents=True, exist_ok=True) _mappings_file.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") @router.get("") async def list_barcodes( search: str = "", current_user: dict = Depends(get_current_user), ): mappings = _load_mappings() items = [] for barcode, info in mappings.items(): if isinstance(info, dict): target = info.get("map_to", info.get("target", "")) desc = info.get("description", "") else: target = str(info) desc = "" if search and search not in barcode and search not in target and search not in desc: continue items.append({"barcode": barcode, "target": target, "description": desc}) return {"items": items, "total": len(items)} @router.get("/{barcode}") async def get_barcode( barcode: str, current_user: dict = Depends(get_current_user), ): mappings = _load_mappings() if barcode not in mappings: raise HTTPException(404, f"未找到条码映射 {barcode}") info = mappings[barcode] if isinstance(info, dict): return {"barcode": barcode, "target": info.get("map_to", info.get("target", "")), "description": info.get("description", "")} return {"barcode": barcode, "target": str(info), "description": ""} @router.post("") async def create_barcode( body: BarcodeMapping, current_user: dict = Depends(get_current_user), ): mappings = _load_mappings() if body.barcode in mappings: raise HTTPException(409, f"条码 {body.barcode} 已存在") mappings[body.barcode] = {"map_to": body.target, "description": body.description or ""} _save_mappings(mappings) return {"message": f"已创建映射 {body.barcode} → {body.target}"} @router.put("/{barcode}") async def update_barcode( barcode: str, body: BarcodeMappingUpdate, current_user: dict = Depends(get_current_user), ): mappings = _load_mappings() if barcode not in mappings: raise HTTPException(404, f"未找到条码映射 {barcode}") existing = mappings[barcode] if not isinstance(existing, dict): existing = {"map_to": str(existing), "description": ""} if body.target is not None: existing["map_to"] = body.target if body.description is not None: existing["description"] = body.description mappings[barcode] = existing _save_mappings(mappings) return {"message": f"已更新映射 {barcode}"} @router.delete("/{barcode}") async def delete_barcode( barcode: str, current_user: dict = Depends(get_current_user), ): mappings = _load_mappings() if barcode not in mappings: raise HTTPException(404, f"未找到条码映射 {barcode}") del mappings[barcode] _save_mappings(mappings) return {"message": f"已删除映射 {barcode}"}