PushToZhaoShang/backend/app.py

529 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, send_from_directory, Response
from flask import stream_with_context
from dotenv import load_dotenv
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from apscheduler.schedulers.background import BackgroundScheduler
from tzlocal import get_localzone
from sqlalchemy import text
from datetime import datetime, timedelta, date
import re
import random
import os
import json
import time
import csv
import io
import requests
import hmac
import hashlib
import base64
load_dotenv()
app = Flask(__name__, static_folder="../frontend", static_url_path="/static")
CORS(app)
# 强制使用绝对路径解决 Windows 路径问题
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
db_path = os.path.join(base_dir, "data", "data.db")
if os.name == 'nt':
# Windows 需要转义反斜杠,或者使用 4 个斜杠 + 驱动器号
# SQLAlchemy 在 Windows 上通常接受 sqlite:///C:\path\to\file
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{db_path}"
else:
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:////{db_path}"
def _ensure_sqlite_dir(url):
# 已通过绝对路径计算,直接检查 data 目录
d = os.path.dirname(db_path)
if not os.path.exists(d):
os.makedirs(d, exist_ok=True)
_ensure_sqlite_dir(app.config['SQLALCHEMY_DATABASE_URI'])
db = SQLAlchemy(app)
class DailyRevenue(db.Model):
__tablename__ = 'daily_revenue'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False, unique=True)
amount = db.Column(db.Float, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_final = db.Column(db.Boolean, default=False, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
source = db.Column(db.String(20))
note = db.Column(db.Text)
class AuditLog(db.Model):
__tablename__ = 'audit_log'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False)
old_amount = db.Column(db.Float)
new_amount = db.Column(db.Float)
reason = db.Column(db.Text)
actor = db.Column(db.String(50))
type = db.Column(db.String(20)) # generate/correct/import_log
created_at = db.Column(db.DateTime, default=datetime.utcnow)
with app.app_context():
db.create_all()
# SQLite 动态扩展列(首次添加)
cols = [row[1] for row in db.session.execute(text('PRAGMA table_info(daily_revenue)')).fetchall()]
if 'is_final' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN is_final INTEGER NOT NULL DEFAULT 0'))
if 'updated_at' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN updated_at DATETIME'))
if 'source' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN source TEXT'))
if 'note' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN note TEXT'))
db.session.commit()
def push_feishu(date_str: str, amount: float, reason: str):
return
def generate_mock_revenue():
"""保持原有逻辑:生成当日模拟营业额"""
base = random.uniform(8000, 15000)
trend = random.uniform(0.95, 1.05)
return round(base * trend, 2)
def _append_log_line(date_str: str, amount: float, shop_name: str):
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
line = f"准备发送消息: 【{shop_name}{date_str}的营业额:{amount}"
with open(log_path, 'a', encoding='utf-8') as f:
f.write(line + "\n")
try:
print(line, flush=True)
except Exception:
pass
def daily_job(target_date=None):
"""定时任务:生成并定版指定日期的数据(默认当日)"""
with app.app_context():
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
if target_date is None:
target_date = datetime.now().date()
existing = DailyRevenue.query.filter_by(date=target_date).first()
if existing:
if not existing.is_final:
existing.is_final = True
existing.source = existing.source or 'generator'
db.session.commit()
# 补推消息
push_feishu(target_date.isoformat(), existing.amount, "daily_finalize")
return
amount = gen_amount_for_date(target_date, cfg)
rev = DailyRevenue(date=target_date, amount=amount, is_final=True, source='generator')
db.session.add(rev)
db.session.add(AuditLog(date=target_date, old_amount=None, new_amount=amount, reason='daily_generate', actor='system', type='generate'))
db.session.commit()
_append_log_line(target_date.isoformat(), amount, shop_name)
push_feishu(target_date.isoformat(), amount, "daily_generate")
def settle_today_if_due():
cfg = load_config()
cutoff = cfg.get("cutoff_hour", 23)
try:
cutoff = int(cutoff)
except Exception:
cutoff = 23
# 只有当前时间 >= cutoff 才尝试结算今天
if datetime.now().hour >= cutoff:
daily_job()
def settle_past_days():
"""启动检查补录过去3天未定版的数据防止服务器宕机漏单"""
with app.app_context():
today = datetime.now().date()
for i in range(1, 4):
d = today - timedelta(days=i)
existing = DailyRevenue.query.filter_by(date=d).first()
if not existing or not existing.is_final:
print(f"补录数据: {d}")
daily_job(d)
# ---- 日志解析与聚合 ----
def load_config():
cfg_path = os.path.join(os.path.dirname(__file__), "..", "config.json")
try:
with open(cfg_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"shop_name": "益选便利店", "weekday_range": [2800, 3600], "weekend_range": [1600, 2000]}
def parse_app_log():
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
data = {}
if not os.path.exists(log_path):
return data
p1 = re.compile(r"准备发送消息: 【.*?】(\d{4}-\d{2}-\d{2})的营业额:(\d+(?:\.\d+)?)")
p2 = re.compile(r"准备发送消息: (\d{4}-\d{2}-\d{2})(\d+(?:\.\d+)?)")
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
m = p1.search(line) or p2.search(line)
if m:
d, amt = m.group(1), float(m.group(2))
data[d] = amt
return data
def gen_amount_for_date(d: date, cfg: dict):
wk = d.weekday()
lo, hi = (cfg.get("weekend_range", [1600, 2000]) if wk >= 5 else cfg.get("weekday_range", [2800, 3600]))
return round(random.uniform(lo, hi), 2)
def sum_period(dates_map, start: date, end: date, cfg):
total = 0.0
cur = start
while cur <= end:
key = cur.isoformat()
total += dates_map.get(key, gen_amount_for_date(cur, cfg))
cur += timedelta(days=1)
return round(total, 2)
def get_periods(today: date):
# 不包含今天
yday = today - timedelta(days=1)
day_before = today - timedelta(days=2)
# 本周(周一到昨日)
monday = yday - timedelta(days=yday.weekday())
week_start = monday
week_end = yday
# 上周(上周一到上周日)
last_week_end = week_start - timedelta(days=1)
last_week_start = last_week_end - timedelta(days=6)
# 本月当月1号到昨日
month_start = yday.replace(day=1)
month_end = yday
return {
"yesterday": yday,
"day_before": day_before,
"this_week": (week_start, week_end),
"last_week": (last_week_start, last_week_end),
"this_month": (month_start, month_end),
}
@app.route("/api/metrics")
def api_metrics():
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
today_local = datetime.now().date()
periods = get_periods(today_local)
yday = periods["yesterday"]
day_before = periods["day_before"]
tw_s, tw_e = periods["this_week"]
lw_s, lw_e = periods["last_week"]
tm_s, tm_e = periods["this_month"]
# DB 为唯一来源
def amt_for(d: date):
r = DailyRevenue.query.filter_by(date=d).first()
return r.amount if (r and r.is_final) else None
# 周区间求和:仅统计已定版日期
def sum_final(start: date, end: date):
cur, total = start, 0.0
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
total += r.amount
cur += timedelta(days=1)
return round(total, 2)
weekdays = ['周一','周二','周三','周四','周五','周六','周日']
out = {
"shop_name": shop_name,
"server_now": datetime.now().isoformat(timespec='seconds'),
"cutoff_hour": 23,
"today": {"date": today_local.isoformat(), "weekday": weekdays[today_local.weekday()], "amount": amt_for(today_local)},
"yesterday": {"date": yday.isoformat(), "amount": amt_for(yday)},
"day_before": {"date": day_before.isoformat(), "amount": amt_for(day_before)},
"this_week": {"start": tw_s.isoformat(), "end": tw_e.isoformat(), "total": sum_final(tw_s, tw_e)},
"last_week": {"start": lw_s.isoformat(), "end": lw_e.isoformat(), "total": sum_final(lw_s, lw_e)},
"this_month": {"start": tm_s.isoformat(), "end": tm_e.isoformat(), "total": sum_final(tm_s, tm_e)},
}
return jsonify(out)
@app.route("/api/series7")
def api_series7():
today_local = datetime.now().date()
days = int(request.args.get('days', 7))
days = max(7, min(days, 90))
# 结束日期:若今日已定版则含今日,否则到昨日
end = today_local
r_today = DailyRevenue.query.filter_by(date=today_local).first()
if not (r_today and r_today.is_final):
end = today_local - timedelta(days=1)
start = end - timedelta(days=days-1)
cfg = load_config()
series = []
cur = start
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
amt = r.amount
est = False
else:
amt = gen_amount_for_date(cur, cfg)
est = True
series.append({"date": cur.isoformat(), "amount": amt, "estimated": est})
cur += timedelta(days=1)
return jsonify(series)
@app.route("/api/revenue")
def api_revenue():
"""查询历史营业额"""
days = int(request.args.get("days", 30))
start = datetime.utcnow().date() - timedelta(days=days)
rows = DailyRevenue.query.filter(DailyRevenue.date >= start).order_by(DailyRevenue.date.desc()).all()
return jsonify([{"date": r.date.isoformat(), "amount": r.amount} for r in rows])
@app.route("/api/audit")
def api_audit():
limit = int(request.args.get("limit", 20))
rows = AuditLog.query.order_by(AuditLog.created_at.desc()).limit(limit).all()
return jsonify([
{
"date": r.date.isoformat(),
"old_amount": r.old_amount,
"new_amount": r.new_amount,
"reason": r.reason,
"actor": r.actor,
"type": r.type,
"created_at": r.created_at.isoformat(timespec='seconds')
} for r in rows
])
@app.route("/api/health")
def api_health():
cfg = load_config()
cutoff = int(cfg.get("cutoff_hour", 23))
today_local = datetime.now().date()
r_today = DailyRevenue.query.filter_by(date=today_local).first()
return jsonify({
"server_now": datetime.now().isoformat(timespec='seconds'),
"cutoff_hour": cutoff,
"today_finalized": bool(r_today and r_today.is_final),
"today_amount": (r_today.amount if r_today else None)
})
@app.route("/api/export")
def api_export():
"""CSV导出"""
rows = DailyRevenue.query.order_by(DailyRevenue.date.desc()).all()
csv = "date,amount\n" + "\n".join([f"{r.date},{r.amount}" for r in rows if r.is_final])
return csv, 200, {"Content-Type": "text/csv; charset=utf-8", "Content-Disposition": "attachment; filename=revenue.csv"}
@app.route("/")
def index():
"""SPA入口"""
return send_from_directory(app.static_folder, "index.html")
@app.route("/admin")
def admin_page():
return send_from_directory(app.static_folder, "admin.html")
@app.route('/api/admin/turnover', methods=['PUT'])
def admin_correct():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
payload = request.get_json(force=True)
d = datetime.strptime(payload['date'], '%Y-%m-%d').date()
new_amt = float(payload['amount'])
reason = payload.get('reason', 'manual_correct')
actor = payload.get('actor', 'admin')
r = DailyRevenue.query.filter_by(date=d).first()
old = r.amount if r else None
if r:
r.amount = new_amt
r.is_final = True
r.source = 'correct'
else:
r = DailyRevenue(date=d, amount=new_amt, is_final=True, source='correct')
db.session.add(r)
db.session.add(AuditLog(date=d, old_amount=old, new_amount=new_amt, reason=reason, actor=actor, type='correct'))
db.session.commit()
_append_log_line(d.isoformat(), new_amt, load_config().get('shop_name', '益选便利店'))
push_feishu(d.isoformat(), new_amt, reason)
return jsonify({"ok": True})
@app.route('/api/admin/test_push', methods=['POST'])
def admin_test_push():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
payload = request.get_json(silent=True) or {}
ds = payload.get('date') or datetime.now().date().isoformat()
amt = float(payload.get('amount') or 1234.56)
reason = payload.get('reason') or 'manual_test'
push_feishu(ds, amt, reason)
return jsonify({"ok": True, "pushed": {"date": ds, "amount": amt, "reason": reason}})
def import_csv_text(text: str, actor: str = 'admin'):
buf = io.StringIO(text)
reader = csv.reader(buf)
imported = 0
now = datetime.now().date()
for row in reader:
if not row:
continue
if row[0] == 'date':
continue
try:
d = datetime.strptime(row[0].strip(), '%Y-%m-%d').date()
if d > now:
continue
amt = float(row[1].strip())
except Exception:
continue
r = DailyRevenue.query.filter_by(date=d).first()
old = r.amount if r else None
if r:
r.amount = amt
r.is_final = True
r.source = 'import_csv'
else:
r = DailyRevenue(date=d, amount=amt, is_final=True, source='import_csv')
db.session.add(r)
db.session.add(AuditLog(date=d, old_amount=old, new_amount=amt, reason='import_csv', actor=actor, type='import_log'))
push_feishu(d.isoformat(), amt, 'import_csv')
imported += 1
db.session.commit()
return imported
@app.route('/api/admin/import', methods=['POST'])
def admin_import():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
raw = request.get_data(as_text=True)
if not raw:
return jsonify({"error": "empty"}), 400
imported = import_csv_text(raw, actor='admin')
return jsonify({"ok": True, "imported": imported})
def auto_import_csv_on_start():
with app.app_context():
flag = os.getenv('AUTO_IMPORT_ON_START', '1')
if str(flag) == '0':
return
p = os.path.join("/app", "data", "import.csv")
if os.path.exists(p):
with open(p, "r", encoding="utf-8") as f:
text = f.read()
import_csv_text(text, actor='bootstrap')
def sync_log_to_db():
"""启动时将 app.log 中缺失的数据同步到 DB只同步今天之前"""
log_map = parse_app_log()
today_local = datetime.now().date()
for ds, amt in log_map.items():
d = datetime.strptime(ds, '%Y-%m-%d').date()
if d >= today_local:
continue
r = DailyRevenue.query.filter_by(date=d).first()
if not r:
db.session.add(DailyRevenue(date=d, amount=amt, is_final=True, source='import_log'))
db.session.add(AuditLog(date=d, old_amount=None, new_amount=amt, reason='import_log', actor='system', type='import_log'))
db.session.commit()
if __name__ == "__main__":
local_tz = get_localzone()
scheduler = BackgroundScheduler(timezone=local_tz)
try:
cutoff = int(load_config().get("cutoff_hour", 23))
except Exception:
cutoff = 23
scheduler.add_job(daily_job, "cron", hour=cutoff, minute=0)
scheduler.start()
with app.app_context():
sync_log_to_db()
auto_import_csv_on_start()
settle_past_days()
settle_today_if_due()
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "5000")))
@app.route('/api/events')
def sse_events():
def event_stream():
while True:
now = datetime.now()
payload = {"type": "tick", "server_now": now.isoformat(timespec='seconds')}
yield f"data: {json.dumps(payload)}\n\n"
if now.minute in (0, 1):
settle_today_if_due()
yield "data: {\"type\": \"force_refresh\"}\n\n"
time.sleep(30)
return Response(stream_with_context(event_stream()), mimetype='text/event-stream')
def push_feishu(date_str: str, amount: float, reason: str):
cfg = load_config()
url = cfg.get("feishu_webhook_url") or cfg.get("webhook_url")
if not url:
return
shop = cfg.get("shop_name", "益选便利店")
try:
d = datetime.strptime(date_str, '%Y-%m-%d').date()
except Exception:
d = datetime.now().date()
y = d - timedelta(days=1)
r = DailyRevenue.query.filter_by(date=y).first()
y_amt = (r.amount if (r and r.is_final) else None)
arrow = ''
diff_str = ''
pct_str = ''
if isinstance(y_amt, (int, float)):
diff = amount - y_amt
arrow = '🔺' if diff >= 0 else '🔻'
diff_str = f"{'+' if diff >= 0 else '-'}{abs(diff):.2f}"
if y_amt != 0:
pct = abs(diff / y_amt) * 100.0
pct_str = f"({'+' if diff >= 0 else '-'}{pct:.2f}%)"
today_line = f"**今日**:¥{amount:.2f}"
if isinstance(y_amt, (int, float)):
today_line += f" {arrow} {diff_str} {pct_str}".strip()
y_line = f"**昨日**{('暂无数据' if y_amt is None else '¥' + format(y_amt, '.2f'))}"
card = {
"config": {"wide_screen_mode": True},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": f"📊 **{shop}** 营业额通知"}},
{"tag": "hr"},
{"tag": "div", "text": {"tag": "lark_md", "content": f"**日期**{date_str}"}},
{"tag": "div", "text": {"tag": "lark_md", "content": today_line}},
{"tag": "div", "text": {"tag": "lark_md", "content": y_line}},
{"tag": "note", "elements": [
{"tag": "plain_text", "content": f"原因:{reason} | 时间:{datetime.now().isoformat(timespec='seconds')}"}
]}
]
}
payload = {"msg_type": "interactive", "card": card}
secret = cfg.get("feishu_secret")
if secret:
ts = str(int(time.time()))
sign_src = ts + "\n" + secret
sign = base64.b64encode(hmac.new(secret.encode(), sign_src.encode(), digestmod=hashlib.sha256).digest()).decode()
payload.update({"timestamp": ts, "sign": sign})
def _log(s: str):
p = os.path.join(os.path.dirname(__file__), "..", "app.log")
with open(p, 'a', encoding='utf-8') as f:
f.write(s + "\n")
try:
print(s, flush=True)
except Exception:
pass
def _post_json(u: str, payload_obj: dict):
body = json.dumps(payload_obj, ensure_ascii=False).encode('utf-8')
return requests.post(u, data=body, headers={'Content-Type': 'application/json; charset=utf-8'}, timeout=5)
try:
resp = _post_json(url, payload)
ok = (200 <= resp.status_code < 300)
_log(f"飞书推送卡片{'成功' if ok else '失败'}: status={resp.status_code} {resp.text[:200]}")
if not ok:
text = f"📊 {shop}\n日期:{date_str}\n今日:¥{amount:.2f}"
if isinstance(y_amt, (int, float)):
text += f" {arrow} {diff_str} {pct_str}".strip()
text += f"\n原因:{reason}"
resp2 = _post_json(url, {"msg_type":"text","content":{"text": text}})
_log(f"飞书推送文本{'成功' if (200 <= resp2.status_code < 300) else '失败'}: status={resp2.status_code} {resp2.text[:200]}")
except Exception as e:
_log(f"飞书推送异常: {str(e)[:200]}")