from flask import Flask, jsonify, request, send_from_directory, Response from flask import stream_with_context from dotenv import load_dotenv from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from apscheduler.schedulers.background import BackgroundScheduler from tzlocal import get_localzone from sqlalchemy import text from datetime import datetime, timedelta, date import re import random import os import json import time import csv import io load_dotenv() app = Flask(__name__, static_folder="../frontend", static_url_path="/static") CORS(app) app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv("DATABASE_URL", "sqlite:///data.db") def _ensure_sqlite_dir(url): if not url.startswith('sqlite:'): return p = url if p.startswith('sqlite:////'): db_path = p.replace('sqlite:////', '/') elif p.startswith('sqlite:///'): db_path = os.path.join(os.getcwd(), p.replace('sqlite:///', '')) else: return d = os.path.dirname(db_path) if d and not os.path.exists(d): os.makedirs(d, exist_ok=True) _ensure_sqlite_dir(app.config['SQLALCHEMY_DATABASE_URI']) db = SQLAlchemy(app) class DailyRevenue(db.Model): __tablename__ = 'daily_revenue' id = db.Column(db.Integer, primary_key=True) date = db.Column(db.Date, nullable=False, unique=True) amount = db.Column(db.Float, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) is_final = db.Column(db.Boolean, default=False, nullable=False) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) source = db.Column(db.String(20)) note = db.Column(db.Text) class AuditLog(db.Model): __tablename__ = 'audit_log' id = db.Column(db.Integer, primary_key=True) date = db.Column(db.Date, nullable=False) old_amount = db.Column(db.Float) new_amount = db.Column(db.Float) reason = db.Column(db.Text) actor = db.Column(db.String(50)) type = db.Column(db.String(20)) # generate/correct/import_log created_at = db.Column(db.DateTime, default=datetime.utcnow) with app.app_context(): db.create_all() # SQLite 动态扩展列(首次添加) cols = [row[1] for row in db.session.execute(text('PRAGMA table_info(daily_revenue)')).fetchall()] if 'is_final' not in cols: db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN is_final INTEGER NOT NULL DEFAULT 0')) if 'updated_at' not in cols: db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN updated_at DATETIME')) if 'source' not in cols: db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN source TEXT')) if 'note' not in cols: db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN note TEXT')) db.session.commit() def generate_mock_revenue(): """保持原有逻辑:生成当日模拟营业额""" base = random.uniform(8000, 15000) trend = random.uniform(0.95, 1.05) return round(base * trend, 2) def _append_log_line(date_str: str, amount: float, shop_name: str): log_path = os.path.join(os.path.dirname(__file__), "..", "app.log") line = f"准备发送消息: 【{shop_name}】{date_str}的营业额:{amount}" with open(log_path, 'a', encoding='utf-8') as f: f.write(line + "\n") def daily_job(): """定时任务:在本地 23:00 生成并定版当日数据(不重复生成)""" cfg = load_config() shop_name = cfg.get("shop_name", "益选便利店") today = datetime.now().date() existing = DailyRevenue.query.filter_by(date=today).first() if existing: if not existing.is_final: existing.is_final = True existing.source = existing.source or 'generator' db.session.commit() return amount = generate_mock_revenue() rev = DailyRevenue(date=today, amount=amount, is_final=True, source='generator') db.session.add(rev) db.session.add(AuditLog(date=today, old_amount=None, new_amount=amount, reason='daily_generate', actor='system', type='generate')) db.session.commit() _append_log_line(today.isoformat(), amount, shop_name) # ---- 日志解析与聚合 ---- def load_config(): cfg_path = os.path.join(os.path.dirname(__file__), "..", "config.json") try: with open(cfg_path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {"shop_name": "益选便利店", "weekday_range": [2800, 3600], "weekend_range": [1600, 2000]} def parse_app_log(): log_path = os.path.join(os.path.dirname(__file__), "..", "app.log") data = {} if not os.path.exists(log_path): return data p1 = re.compile(r"准备发送消息: 【.*?】(\d{4}-\d{2}-\d{2})的营业额:(\d+(?:\.\d+)?)") p2 = re.compile(r"准备发送消息: (\d{4}-\d{2}-\d{2}):(\d+(?:\.\d+)?)") with open(log_path, "r", encoding="utf-8") as f: for line in f: m = p1.search(line) or p2.search(line) if m: d, amt = m.group(1), float(m.group(2)) data[d] = amt return data def gen_amount_for_date(d: date, cfg: dict): wk = d.weekday() lo, hi = (cfg.get("weekend_range", [1600, 2000]) if wk >= 5 else cfg.get("weekday_range", [2800, 3600])) return round(random.uniform(lo, hi), 2) def sum_period(dates_map, start: date, end: date, cfg): total = 0.0 cur = start while cur <= end: key = cur.isoformat() total += dates_map.get(key, gen_amount_for_date(cur, cfg)) cur += timedelta(days=1) return round(total, 2) def get_periods(today: date): # 不包含今天 yday = today - timedelta(days=1) day_before = today - timedelta(days=2) # 本周(周一到昨日) monday = yday - timedelta(days=yday.weekday()) week_start = monday week_end = yday # 上周(上周一到上周日) last_week_end = week_start - timedelta(days=1) last_week_start = last_week_end - timedelta(days=6) # 本月(当月1号到昨日) month_start = yday.replace(day=1) month_end = yday return { "yesterday": yday, "day_before": day_before, "this_week": (week_start, week_end), "last_week": (last_week_start, last_week_end), "this_month": (month_start, month_end), } @app.route("/api/metrics") def api_metrics(): cfg = load_config() shop_name = cfg.get("shop_name", "益选便利店") today_local = datetime.now().date() periods = get_periods(today_local) yday = periods["yesterday"] day_before = periods["day_before"] tw_s, tw_e = periods["this_week"] lw_s, lw_e = periods["last_week"] tm_s, tm_e = periods["this_month"] # DB 为唯一来源 def amt_for(d: date): r = DailyRevenue.query.filter_by(date=d).first() return r.amount if (r and r.is_final) else None # 周区间求和:仅统计已定版日期 def sum_final(start: date, end: date): cur, total = start, 0.0 while cur <= end: r = DailyRevenue.query.filter_by(date=cur).first() if r and r.is_final: total += r.amount cur += timedelta(days=1) return round(total, 2) weekdays = ['周一','周二','周三','周四','周五','周六','周日'] out = { "shop_name": shop_name, "server_now": datetime.now().isoformat(timespec='seconds'), "cutoff_hour": 23, "today": {"date": today_local.isoformat(), "weekday": weekdays[today_local.weekday()], "amount": amt_for(today_local)}, "yesterday": {"date": yday.isoformat(), "amount": amt_for(yday)}, "day_before": {"date": day_before.isoformat(), "amount": amt_for(day_before)}, "this_week": {"start": tw_s.isoformat(), "end": tw_e.isoformat(), "total": sum_final(tw_s, tw_e)}, "last_week": {"start": lw_s.isoformat(), "end": lw_e.isoformat(), "total": sum_final(lw_s, lw_e)}, "this_month": {"start": tm_s.isoformat(), "end": tm_e.isoformat(), "total": sum_final(tm_s, tm_e)}, } return jsonify(out) @app.route("/api/series7") def api_series7(): today_local = datetime.now().date() days = int(request.args.get('days', 7)) days = max(7, min(days, 90)) # 结束日期:若今日已定版则含今日,否则到昨日 end = today_local r_today = DailyRevenue.query.filter_by(date=today_local).first() if not (r_today and r_today.is_final): end = today_local - timedelta(days=1) start = end - timedelta(days=days-1) cfg = load_config() series = [] cur = start while cur <= end: r = DailyRevenue.query.filter_by(date=cur).first() if r and r.is_final: amt = r.amount est = False else: amt = gen_amount_for_date(cur, cfg) est = True series.append({"date": cur.isoformat(), "amount": amt, "estimated": est}) cur += timedelta(days=1) return jsonify(series) @app.route("/api/revenue") def api_revenue(): """查询历史营业额""" days = int(request.args.get("days", 30)) start = datetime.utcnow().date() - timedelta(days=days) rows = DailyRevenue.query.filter(DailyRevenue.date >= start).order_by(DailyRevenue.date.desc()).all() return jsonify([{"date": r.date.isoformat(), "amount": r.amount} for r in rows]) @app.route("/api/export") def api_export(): """CSV导出""" rows = DailyRevenue.query.order_by(DailyRevenue.date.desc()).all() csv = "date,amount\n" + "\n".join([f"{r.date},{r.amount}" for r in rows if r.is_final]) return csv, 200, {"Content-Type": "text/csv; charset=utf-8", "Content-Disposition": "attachment; filename=revenue.csv"} @app.route("/") def index(): """SPA入口""" return send_from_directory(app.static_folder, "index.html") @app.route("/admin") def admin_page(): return send_from_directory(app.static_folder, "admin.html") @app.route('/api/admin/turnover', methods=['PUT']) def admin_correct(): token = os.getenv('ADMIN_TOKEN') if token and request.headers.get('X-Admin-Token') != token: return jsonify({"error": "unauthorized"}), 401 payload = request.get_json(force=True) d = datetime.strptime(payload['date'], '%Y-%m-%d').date() new_amt = float(payload['amount']) reason = payload.get('reason', 'manual_correct') actor = payload.get('actor', 'admin') r = DailyRevenue.query.filter_by(date=d).first() old = r.amount if r else None if r: r.amount = new_amt r.is_final = True r.source = 'correct' else: r = DailyRevenue(date=d, amount=new_amt, is_final=True, source='correct') db.session.add(r) db.session.add(AuditLog(date=d, old_amount=old, new_amount=new_amt, reason=reason, actor=actor, type='correct')) db.session.commit() _append_log_line(d.isoformat(), new_amt, load_config().get('shop_name', '益选便利店')) return jsonify({"ok": True}) def import_csv_text(text: str, actor: str = 'admin'): buf = io.StringIO(text) reader = csv.reader(buf) imported = 0 now = datetime.now().date() for row in reader: if not row: continue if row[0] == 'date': continue try: d = datetime.strptime(row[0].strip(), '%Y-%m-%d').date() if d > now: continue amt = float(row[1].strip()) except Exception: continue r = DailyRevenue.query.filter_by(date=d).first() old = r.amount if r else None if r: r.amount = amt r.is_final = True r.source = 'import_csv' else: r = DailyRevenue(date=d, amount=amt, is_final=True, source='import_csv') db.session.add(r) db.session.add(AuditLog(date=d, old_amount=old, new_amount=amt, reason='import_csv', actor=actor, type='import_log')) imported += 1 db.session.commit() return imported @app.route('/api/admin/import', methods=['POST']) def admin_import(): token = os.getenv('ADMIN_TOKEN') if token and request.headers.get('X-Admin-Token') != token: return jsonify({"error": "unauthorized"}), 401 raw = request.get_data(as_text=True) if not raw: return jsonify({"error": "empty"}), 400 imported = import_csv_text(raw, actor='admin') return jsonify({"ok": True, "imported": imported}) def auto_import_csv_on_start(): with app.app_context(): flag = os.getenv('AUTO_IMPORT_ON_START', '1') if str(flag) == '0': return paths = [ os.path.join("/app", "data", "import.csv"), os.path.join(os.path.dirname(__file__), "..", "init", "revenue.csv"), os.path.join(os.path.dirname(__file__), "..", "init", "revenue.sample.csv"), ] for p in paths: if os.path.exists(p): with open(p, "r", encoding="utf-8") as f: text = f.read() import_csv_text(text, actor='bootstrap') break def sync_log_to_db(): """启动时将 app.log 中缺失的数据同步到 DB(只同步今天之前)""" log_map = parse_app_log() today_local = datetime.now().date() for ds, amt in log_map.items(): d = datetime.strptime(ds, '%Y-%m-%d').date() if d >= today_local: continue r = DailyRevenue.query.filter_by(date=d).first() if not r: db.session.add(DailyRevenue(date=d, amount=amt, is_final=True, source='import_log')) db.session.add(AuditLog(date=d, old_amount=None, new_amount=amt, reason='import_log', actor='system', type='import_log')) db.session.commit() if __name__ == "__main__": local_tz = get_localzone() scheduler = BackgroundScheduler(timezone=local_tz) scheduler.add_job(daily_job, "cron", hour=23, minute=0) scheduler.start() with app.app_context(): sync_log_to_db() auto_import_csv_on_start() app.run(host="0.0.0.0", port=int(os.getenv("PORT", "5000"))) @app.route('/api/events') def sse_events(): def event_stream(): while True: now = datetime.now() payload = {"type": "tick", "server_now": now.isoformat(timespec='seconds')} yield f"data: {json.dumps(payload)}\n\n" if now.hour == 23 and now.minute == 1: yield "data: {\"type\": \"force_refresh\"}\n\n" time.sleep(30) return Response(stream_with_context(event_stream()), mimetype='text/event-stream')