PushToZhaoShang/backend/app.py
2025-12-07 21:04:24 +08:00

295 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, send_from_directory, Response
from flask import stream_with_context
from dotenv import load_dotenv
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from apscheduler.schedulers.background import BackgroundScheduler
from tzlocal import get_localzone
from sqlalchemy import text
from datetime import datetime, timedelta, date
import re
import random
import os
import json
import time
load_dotenv()
app = Flask(__name__, static_folder="../frontend", static_url_path="/static")
CORS(app)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv("DATABASE_URL", "sqlite:///data.db")
db = SQLAlchemy(app)
class DailyRevenue(db.Model):
__tablename__ = 'daily_revenue'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False, unique=True)
amount = db.Column(db.Float, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_final = db.Column(db.Boolean, default=False, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
source = db.Column(db.String(20))
note = db.Column(db.Text)
class AuditLog(db.Model):
__tablename__ = 'audit_log'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False)
old_amount = db.Column(db.Float)
new_amount = db.Column(db.Float)
reason = db.Column(db.Text)
actor = db.Column(db.String(50))
type = db.Column(db.String(20)) # generate/correct/import_log
created_at = db.Column(db.DateTime, default=datetime.utcnow)
with app.app_context():
db.create_all()
# SQLite 动态扩展列(首次添加)
cols = [row[1] for row in db.session.execute(text('PRAGMA table_info(daily_revenue)')).fetchall()]
if 'is_final' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN is_final INTEGER NOT NULL DEFAULT 0'))
if 'updated_at' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN updated_at DATETIME'))
if 'source' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN source TEXT'))
if 'note' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN note TEXT'))
db.session.commit()
def generate_mock_revenue():
"""保持原有逻辑:生成当日模拟营业额"""
base = random.uniform(8000, 15000)
trend = random.uniform(0.95, 1.05)
return round(base * trend, 2)
def _append_log_line(date_str: str, amount: float, shop_name: str):
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
line = f"准备发送消息: 【{shop_name}{date_str}的营业额:{amount}"
with open(log_path, 'a', encoding='utf-8') as f:
f.write(line + "\n")
def daily_job():
"""定时任务:在本地 23:00 生成并定版当日数据(不重复生成)"""
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
today = datetime.now().date()
existing = DailyRevenue.query.filter_by(date=today).first()
if existing:
if not existing.is_final:
existing.is_final = True
existing.source = existing.source or 'generator'
db.session.commit()
return
amount = generate_mock_revenue()
rev = DailyRevenue(date=today, amount=amount, is_final=True, source='generator')
db.session.add(rev)
db.session.add(AuditLog(date=today, old_amount=None, new_amount=amount, reason='daily_generate', actor='system', type='generate'))
db.session.commit()
_append_log_line(today.isoformat(), amount, shop_name)
# ---- 日志解析与聚合 ----
def load_config():
cfg_path = os.path.join(os.path.dirname(__file__), "..", "config.json")
try:
with open(cfg_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"shop_name": "益选便利店", "weekday_range": [2800, 3600], "weekend_range": [1600, 2000]}
def parse_app_log():
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
data = {}
if not os.path.exists(log_path):
return data
p1 = re.compile(r"准备发送消息: 【.*?】(\d{4}-\d{2}-\d{2})的营业额:(\d+(?:\.\d+)?)")
p2 = re.compile(r"准备发送消息: (\d{4}-\d{2}-\d{2})(\d+(?:\.\d+)?)")
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
m = p1.search(line) or p2.search(line)
if m:
d, amt = m.group(1), float(m.group(2))
data[d] = amt
return data
def gen_amount_for_date(d: date, cfg: dict):
wk = d.weekday()
lo, hi = (cfg.get("weekend_range", [1600, 2000]) if wk >= 5 else cfg.get("weekday_range", [2800, 3600]))
return round(random.uniform(lo, hi), 2)
def sum_period(dates_map, start: date, end: date, cfg):
total = 0.0
cur = start
while cur <= end:
key = cur.isoformat()
total += dates_map.get(key, gen_amount_for_date(cur, cfg))
cur += timedelta(days=1)
return round(total, 2)
def get_periods(today: date):
# 不包含今天
yday = today - timedelta(days=1)
day_before = today - timedelta(days=2)
# 本周(周一到昨日)
monday = yday - timedelta(days=yday.weekday())
week_start = monday
week_end = yday
# 上周(上周一到上周日)
last_week_end = week_start - timedelta(days=1)
last_week_start = last_week_end - timedelta(days=6)
# 本月当月1号到昨日
month_start = yday.replace(day=1)
month_end = yday
return {
"yesterday": yday,
"day_before": day_before,
"this_week": (week_start, week_end),
"last_week": (last_week_start, last_week_end),
"this_month": (month_start, month_end),
}
@app.route("/api/metrics")
def api_metrics():
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
today_local = datetime.now().date()
periods = get_periods(today_local)
yday = periods["yesterday"]
day_before = periods["day_before"]
tw_s, tw_e = periods["this_week"]
lw_s, lw_e = periods["last_week"]
tm_s, tm_e = periods["this_month"]
# DB 为唯一来源
def amt_for(d: date):
r = DailyRevenue.query.filter_by(date=d).first()
return r.amount if (r and r.is_final) else None
# 周区间求和:仅统计已定版日期
def sum_final(start: date, end: date):
cur, total = start, 0.0
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
total += r.amount
cur += timedelta(days=1)
return round(total, 2)
weekdays = ['周一','周二','周三','周四','周五','周六','周日']
out = {
"shop_name": shop_name,
"server_now": datetime.now().isoformat(timespec='seconds'),
"cutoff_hour": 23,
"today": {"date": today_local.isoformat(), "weekday": weekdays[today_local.weekday()], "amount": amt_for(today_local)},
"yesterday": {"date": yday.isoformat(), "amount": amt_for(yday)},
"day_before": {"date": day_before.isoformat(), "amount": amt_for(day_before)},
"this_week": {"start": tw_s.isoformat(), "end": tw_e.isoformat(), "total": sum_final(tw_s, tw_e)},
"last_week": {"start": lw_s.isoformat(), "end": lw_e.isoformat(), "total": sum_final(lw_s, lw_e)},
"this_month": {"start": tm_s.isoformat(), "end": tm_e.isoformat(), "total": sum_final(tm_s, tm_e)},
}
return jsonify(out)
@app.route("/api/series7")
def api_series7():
today_local = datetime.now().date()
days = int(request.args.get('days', 7))
days = max(7, min(days, 90))
# 结束日期:若今日已定版则含今日,否则到昨日
end = today_local
r_today = DailyRevenue.query.filter_by(date=today_local).first()
if not (r_today and r_today.is_final):
end = today_local - timedelta(days=1)
start = end - timedelta(days=days-1)
cfg = load_config()
series = []
cur = start
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
amt = r.amount
est = False
else:
amt = gen_amount_for_date(cur, cfg)
est = True
series.append({"date": cur.isoformat(), "amount": amt, "estimated": est})
cur += timedelta(days=1)
return jsonify(series)
@app.route("/api/revenue")
def api_revenue():
"""查询历史营业额"""
days = int(request.args.get("days", 30))
start = datetime.utcnow().date() - timedelta(days=days)
rows = DailyRevenue.query.filter(DailyRevenue.date >= start).order_by(DailyRevenue.date.desc()).all()
return jsonify([{"date": r.date.isoformat(), "amount": r.amount} for r in rows])
@app.route("/api/export")
def api_export():
"""CSV导出"""
rows = DailyRevenue.query.order_by(DailyRevenue.date.desc()).all()
csv = "date,amount\n" + "\n".join([f"{r.date},{r.amount}" for r in rows if r.is_final])
return csv, 200, {"Content-Type": "text/csv; charset=utf-8", "Content-Disposition": "attachment; filename=revenue.csv"}
@app.route("/")
def index():
"""SPA入口"""
return send_from_directory(app.static_folder, "index.html")
@app.route("/admin")
def admin_page():
return send_from_directory(app.static_folder, "admin.html")
@app.route('/api/admin/turnover', methods=['PUT'])
def admin_correct():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
payload = request.get_json(force=True)
d = datetime.strptime(payload['date'], '%Y-%m-%d').date()
new_amt = float(payload['amount'])
reason = payload.get('reason', 'manual_correct')
actor = payload.get('actor', 'admin')
r = DailyRevenue.query.filter_by(date=d).first()
old = r.amount if r else None
if r:
r.amount = new_amt
r.is_final = True
r.source = 'correct'
else:
r = DailyRevenue(date=d, amount=new_amt, is_final=True, source='correct')
db.session.add(r)
db.session.add(AuditLog(date=d, old_amount=old, new_amount=new_amt, reason=reason, actor=actor, type='correct'))
db.session.commit()
_append_log_line(d.isoformat(), new_amt, load_config().get('shop_name', '益选便利店'))
return jsonify({"ok": True})
def sync_log_to_db():
"""启动时将 app.log 中缺失的数据同步到 DB只同步今天之前"""
log_map = parse_app_log()
today_local = datetime.now().date()
for ds, amt in log_map.items():
d = datetime.strptime(ds, '%Y-%m-%d').date()
if d >= today_local:
continue
r = DailyRevenue.query.filter_by(date=d).first()
if not r:
db.session.add(DailyRevenue(date=d, amount=amt, is_final=True, source='import_log'))
db.session.add(AuditLog(date=d, old_amount=None, new_amount=amt, reason='import_log', actor='system', type='import_log'))
db.session.commit()
if __name__ == "__main__":
local_tz = get_localzone()
scheduler = BackgroundScheduler(timezone=local_tz)
scheduler.add_job(daily_job, "cron", hour=23, minute=0)
scheduler.start()
with app.app_context():
sync_log_to_db()
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "5000")))
@app.route('/api/events')
def sse_events():
def event_stream():
while True:
now = datetime.now()
payload = {"type": "tick", "server_now": now.isoformat(timespec='seconds')}
yield f"data: {json.dumps(payload)}\n\n"
if now.hour == 23 and now.minute == 1:
yield "data: {\"type\": \"force_refresh\"}\n\n"
time.sleep(30)
return Response(stream_with_context(event_stream()), mimetype='text/event-stream')