180 lines
7.2 KiB
Python
180 lines
7.2 KiB
Python
from fastapi import FastAPI, UploadFile, File, Query
|
|
from fastapi.responses import HTMLResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
import sqlite3
|
|
import pandas as pd
|
|
import os
|
|
from datetime import datetime
|
|
|
|
DB_PATH = os.path.join(os.getcwd(), "data", "products.db")
|
|
os.makedirs(os.path.join(os.getcwd(), "data"), exist_ok=True)
|
|
|
|
def get_conn():
|
|
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
except Exception:
|
|
pass
|
|
return conn
|
|
|
|
def init_db():
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
create table if not exists products (
|
|
barcode text primary key,
|
|
name text,
|
|
purchase_price real,
|
|
sale_price real,
|
|
category text,
|
|
created_at text,
|
|
source_file text
|
|
)
|
|
"""
|
|
)
|
|
cur.execute("create index if not exists idx_products_name on products(name)")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
init_db()
|
|
|
|
app = FastAPI()
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def root():
|
|
return open(os.path.join("static", "index.html"), "r", encoding="utf-8").read()
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
def normalize_price(v):
|
|
if v is None:
|
|
return None
|
|
try:
|
|
s = str(v).strip()
|
|
if s == "":
|
|
return None
|
|
return float(s)
|
|
except:
|
|
return None
|
|
|
|
def normalize_text(v):
|
|
if v is None:
|
|
return None
|
|
s = str(v).strip()
|
|
return s if s != "" else None
|
|
|
|
def import_df(df: pd.DataFrame, source_file: str):
|
|
required = ["名称(必填)", "进货价(必填)", "销售价(必填)", "条码"]
|
|
for col in required:
|
|
if col not in df.columns:
|
|
return {"error": f"missing column: {col}"}
|
|
df = df[["名称(必填)", "进货价(必填)", "销售价(必填)", "条码", "分类(必填)"]].copy() if "分类(必填)" in df.columns else df[["名称(必填)", "进货价(必填)", "销售价(必填)", "条码"]].copy()
|
|
df.columns = ["name", "purchase_price", "sale_price", "barcode"] + (["category"] if df.shape[1] == 5 else [])
|
|
df["name"] = df["name"].apply(normalize_text)
|
|
df["barcode"] = df["barcode"].apply(normalize_text)
|
|
df["purchase_price"] = df["purchase_price"].apply(normalize_price)
|
|
df["sale_price"] = df["sale_price"].apply(normalize_price)
|
|
if "category" in df.columns:
|
|
df["category"] = df["category"].apply(normalize_text)
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
inserted = 0
|
|
updated = 0
|
|
skipped = 0
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
for _, row in df.iterrows():
|
|
bc = row.get("barcode")
|
|
nm = row.get("name")
|
|
pp = row.get("purchase_price")
|
|
sp = row.get("sale_price")
|
|
ct = row.get("category") if "category" in df.columns else None
|
|
if bc is None:
|
|
skipped += 1
|
|
continue
|
|
cur.execute("select barcode from products where barcode=?", (bc,))
|
|
exists = cur.fetchone()
|
|
if exists:
|
|
cur.execute(
|
|
"update products set name=?, purchase_price=?, sale_price=?, category=?, created_at=?, source_file=? where barcode=?",
|
|
(nm, pp, sp, ct, now, source_file, bc),
|
|
)
|
|
updated += 1
|
|
else:
|
|
cur.execute(
|
|
"insert into products(barcode,name,purchase_price,sale_price,category,created_at,source_file) values(?,?,?,?,?,?,?)",
|
|
(bc, nm, pp, sp, ct, now, source_file),
|
|
)
|
|
inserted += 1
|
|
conn.commit()
|
|
conn.close()
|
|
return {"inserted": inserted, "updated": updated, "skipped": skipped}
|
|
|
|
@app.post("/import")
|
|
async def import_excel(file: UploadFile = File(...)):
|
|
tmp = os.path.join(os.getcwd(), "data", f"upload_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file.filename}")
|
|
with open(tmp, "wb") as f:
|
|
f.write(await file.read())
|
|
try:
|
|
df = pd.read_excel(tmp, sheet_name="Sheet1")
|
|
result = import_df(df, os.path.basename(file.filename))
|
|
return result if isinstance(result, dict) else {"error": "import failed"}
|
|
finally:
|
|
pass
|
|
|
|
@app.get("/products/{barcode}")
|
|
def get_product(barcode: str):
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute("select name,purchase_price,sale_price,barcode from products where barcode=?", (barcode,))
|
|
row = cur.fetchone()
|
|
conn.close()
|
|
if not row:
|
|
return JSONResponse(status_code=404, content={"error": "not found"})
|
|
return {"name": row[0], "purchase_price": row[1], "sale_price": row[2], "barcode": row[3]}
|
|
|
|
@app.get("/products")
|
|
def search_products(q: str = Query("", min_length=0), limit: int = 20, page: int = 1, sort: str = "", order: str = "asc"):
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
offset = max((page - 1), 0) * max(limit, 1)
|
|
sort_map = {"name": "name", "sale_price": "sale_price", "purchase_price": "purchase_price", "created_at": "created_at", "barcode": "barcode"}
|
|
sort_col = sort_map.get(sort, "")
|
|
order_sql = "DESC" if str(order).lower() == "desc" else "ASC"
|
|
if q:
|
|
s = q.strip()
|
|
if s.isdigit():
|
|
if sort_col:
|
|
cur.execute(
|
|
f"select distinct name,purchase_price,sale_price,barcode from products where barcode like ? or barcode like ? or barcode like ? or name like ? order by {sort_col} {order_sql} limit ? offset ?",
|
|
(f"{s}%", f"%{s}", f"%{s}%", f"%{s}%", limit, offset),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"select name,purchase_price,sale_price,barcode, case when barcode like ? then 0 when barcode like ? then 1 when barcode like ? then 2 when name like ? then 3 else 4 end as score from products where barcode like ? or barcode like ? or barcode like ? or name like ? order by score, name limit ? offset ?",
|
|
(f"{s}%", f"%{s}", f"%{s}%", f"%{s}%", f"{s}%", f"%{s}", f"%{s}%", f"%{s}%", limit, offset),
|
|
)
|
|
else:
|
|
if sort_col:
|
|
cur.execute(
|
|
f"select name,purchase_price,sale_price,barcode from products where name like ? order by {sort_col} {order_sql} limit ? offset ?",
|
|
(f"%{s}%", limit, offset),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"select name,purchase_price,sale_price,barcode, case when instr(name, ?) = 1 then 0 when instr(name, ?) > 0 then 1 else 2 end as score from products where name like ? order by score, name limit ? offset ?",
|
|
(s, s, f"%{s}%", limit, offset),
|
|
)
|
|
else:
|
|
if sort_col:
|
|
cur.execute(f"select name,purchase_price,sale_price,barcode from products order by {sort_col} {order_sql} limit ? offset ?", (limit, offset))
|
|
else:
|
|
cur.execute("select name,purchase_price,sale_price,barcode from products order by name limit ? offset ?", (limit, offset))
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
return [{"name": r[0], "purchase_price": r[1], "sale_price": r[2], "barcode": r[3]} for r in rows]
|