PushToZhaoShang/backend/app.py
houhuan fb546861b5 fix: 修正每日营收任务逻辑并更新配置
修复daily_job函数中的逻辑错误,确保正确检查和处理已存在的营收记录
将config.json中的cutoff_hour和cutoff_time更新为00:00
2025-12-10 01:01:05 +08:00

677 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, send_from_directory, Response
from flask import stream_with_context
from dotenv import load_dotenv
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from apscheduler.schedulers.background import BackgroundScheduler
from tzlocal import get_localzone
from sqlalchemy import text
from datetime import datetime, timedelta, date
import re
import random
import os
import json
import time
import csv
import io
import requests
import hmac
import hashlib
import base64
load_dotenv()
app = Flask(__name__, static_folder="../frontend", static_url_path="/static")
CORS(app)
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
db_path = os.path.join(base_dir, "data", "data.db")
db_url_env = os.getenv('DATABASE_URL')
if db_url_env:
app.config['SQLALCHEMY_DATABASE_URI'] = db_url_env
else:
if os.name == 'nt':
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{db_path}"
else:
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:////{db_path}"
def _ensure_sqlite_dir():
d = os.path.dirname(db_path)
if not os.path.exists(d):
os.makedirs(d, exist_ok=True)
_ensure_sqlite_dir()
db = SQLAlchemy(app)
scheduler = None
class DailyRevenue(db.Model):
__tablename__ = 'daily_revenue'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False, unique=True)
amount = db.Column(db.Float, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_final = db.Column(db.Boolean, default=False, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
source = db.Column(db.String(20))
note = db.Column(db.Text)
class AuditLog(db.Model):
__tablename__ = 'audit_log'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.Date, nullable=False)
old_amount = db.Column(db.Float)
new_amount = db.Column(db.Float)
reason = db.Column(db.Text)
actor = db.Column(db.String(50))
type = db.Column(db.String(20)) # generate/correct/import_log
created_at = db.Column(db.DateTime, default=datetime.utcnow)
with app.app_context():
db.create_all()
# SQLite 动态扩展列(首次添加)
cols = [row[1] for row in db.session.execute(text('PRAGMA table_info(daily_revenue)')).fetchall()]
if 'is_final' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN is_final INTEGER NOT NULL DEFAULT 0'))
if 'updated_at' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN updated_at DATETIME'))
if 'source' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN source TEXT'))
if 'note' not in cols:
db.session.execute(text('ALTER TABLE daily_revenue ADD COLUMN note TEXT'))
db.session.commit()
def push_feishu(date_str: str, amount: float, reason: str):
return
def generate_mock_revenue():
"""保持原有逻辑:生成当日模拟营业额"""
base = random.uniform(8000, 15000)
trend = random.uniform(0.95, 1.05)
return round(base * trend, 2)
def _append_log_line(date_str: str, amount: float, shop_name: str):
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
line = f"准备发送消息: 【{shop_name}{date_str}的营业额:{amount}"
with open(log_path, 'a', encoding='utf-8') as f:
f.write(line + "\n")
try:
print(line, flush=True)
except Exception:
pass
def daily_job(target_date=None):
"""定时任务:生成并定版指定日期的数据(默认当日)"""
with app.app_context():
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
if target_date is None:
target_date = datetime.now().date()
existing = DailyRevenue.query.filter_by(date=target_date).first()
if existing:
if not existing.is_final:
existing.is_final = True
existing.source = existing.source or 'generator'
db.session.commit()
push_feishu(target_date.isoformat(), existing.amount, "daily_finalize")
else:
push_feishu(target_date.isoformat(), existing.amount, "daily_exists")
return
amount = gen_amount_for_date(target_date, cfg)
rev = DailyRevenue(date=target_date, amount=amount, is_final=True, source='generator')
db.session.add(rev)
db.session.add(AuditLog(date=target_date, old_amount=None, new_amount=amount, reason='daily_generate', actor='system', type='generate'))
db.session.commit()
_append_log_line(target_date.isoformat(), amount, shop_name)
push_feishu(target_date.isoformat(), amount, "daily_generate")
def settle_today_if_due():
cfg = load_config()
ct = cfg.get("cutoff_time")
ch = cfg.get("cutoff_hour", 23)
cm = 0
if isinstance(ct, str) and re.match(r'^\d{1,2}:\d{2}$', ct):
try:
p = ct.split(':')
ch = int(p[0]); cm = int(p[1])
except Exception:
pass
try:
ch = int(ch)
except Exception:
ch = 23
if ch < 0 or ch > 23:
ch = 23
if cm < 0 or cm > 59:
cm = 0
local_tz = get_localzone()
now = datetime.now(local_tz)
if (now.hour > ch) or (now.hour == ch and now.minute >= cm):
daily_job()
def settle_past_days():
"""启动检查补录过去3天未定版的数据防止服务器宕机漏单"""
with app.app_context():
local_tz = get_localzone()
today = datetime.now(local_tz).date()
for i in range(1, 4):
d = today - timedelta(days=i)
existing = DailyRevenue.query.filter_by(date=d).first()
if not existing or not existing.is_final:
print(f"补录数据: {d}")
daily_job(d)
# ---- 日志解析与聚合 ----
def load_config():
cfg_path = os.path.join(os.path.dirname(__file__), "..", "config.json")
try:
with open(cfg_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"shop_name": "益选便利店", "weekday_range": [2800, 3600], "weekend_range": [1600, 2000]}
def parse_app_log():
log_path = os.path.join(os.path.dirname(__file__), "..", "app.log")
data = {}
if not os.path.exists(log_path):
return data
p1 = re.compile(r"准备发送消息: 【.*?】(\d{4}-\d{2}-\d{2})的营业额:(\d+(?:\.\d+)?)")
p2 = re.compile(r"准备发送消息: (\d{4}-\d{2}-\d{2})(\d+(?:\.\d+)?)")
with open(log_path, "r", encoding="utf-8") as f:
for line in f:
m = p1.search(line) or p2.search(line)
if m:
d, amt = m.group(1), float(m.group(2))
data[d] = amt
return data
def gen_amount_for_date(d: date, cfg: dict):
wk = d.weekday()
lo, hi = (cfg.get("weekend_range", [1600, 2000]) if wk >= 5 else cfg.get("weekday_range", [2800, 3600]))
return round(random.uniform(lo, hi), 2)
def sum_period(dates_map, start: date, end: date, cfg):
total = 0.0
cur = start
while cur <= end:
key = cur.isoformat()
total += dates_map.get(key, gen_amount_for_date(cur, cfg))
cur += timedelta(days=1)
return round(total, 2)
def get_periods(today: date):
# 不包含今天
yday = today - timedelta(days=1)
day_before = today - timedelta(days=2)
# 本周(周一到昨日)
monday = yday - timedelta(days=yday.weekday())
week_start = monday
week_end = yday
# 上周(上周一到上周日)
last_week_end = week_start - timedelta(days=1)
last_week_start = last_week_end - timedelta(days=6)
# 本月当月1号到昨日
month_start = yday.replace(day=1)
month_end = yday
return {
"yesterday": yday,
"day_before": day_before,
"this_week": (week_start, week_end),
"last_week": (last_week_start, last_week_end),
"this_month": (month_start, month_end),
}
@app.route("/api/metrics")
def api_metrics():
cfg = load_config()
shop_name = cfg.get("shop_name", "益选便利店")
local_tz = get_localzone()
now_local = datetime.now(local_tz)
today_local = now_local.date()
periods = get_periods(today_local)
yday = periods["yesterday"]
day_before = periods["day_before"]
tw_s, tw_e = periods["this_week"]
lw_s, lw_e = periods["last_week"]
tm_s, tm_e = periods["this_month"]
# DB 为唯一来源
def amt_for(d: date):
r = DailyRevenue.query.filter_by(date=d).first()
return r.amount if (r and r.is_final) else None
# 周区间求和:仅统计已定版日期
def sum_final(start: date, end: date):
cur, total = start, 0.0
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
total += r.amount
cur += timedelta(days=1)
return round(total, 2)
weekdays = ['周一','周二','周三','周四','周五','周六','周日']
def get_cutoff_hm():
ct = cfg.get("cutoff_time")
h = cfg.get("cutoff_hour", 23)
m = 0
if isinstance(ct, str) and re.match(r'^\d{1,2}:\d{2}$', ct):
try:
p = ct.split(':')
h = int(p[0]); m = int(p[1])
except Exception:
pass
try:
h = int(h)
except Exception:
h = 23
if h < 0 or h > 23:
h = 23
if m < 0 or m > 59:
m = 0
return h, m
ch, cm = get_cutoff_hm()
show_today = (now_local.hour > ch) or (now_local.hour == ch and now_local.minute >= cm)
out = {
"shop_name": shop_name,
"server_now": now_local.isoformat(timespec='seconds'),
"cutoff_hour": ch,
"cutoff_time": f"{ch:02d}:{cm:02d}",
"today": {"date": today_local.isoformat(), "weekday": weekdays[today_local.weekday()], "amount": (amt_for(today_local) if show_today else None)},
"yesterday": {"date": yday.isoformat(), "amount": amt_for(yday)},
"day_before": {"date": day_before.isoformat(), "amount": amt_for(day_before)},
"this_week": {"start": tw_s.isoformat(), "end": tw_e.isoformat(), "total": sum_final(tw_s, tw_e)},
"last_week": {"start": lw_s.isoformat(), "end": lw_e.isoformat(), "total": sum_final(lw_s, lw_e)},
"this_month": {"start": tm_s.isoformat(), "end": tm_e.isoformat(), "total": sum_final(tm_s, tm_e)},
}
return jsonify(out)
@app.route("/api/series7")
def api_series7():
local_tz = get_localzone()
now_local = datetime.now(local_tz)
today_local = now_local.date()
days = int(request.args.get('days', 7))
days = max(7, min(days, 90))
# 结束日期:若到达截止时间且今日已定版则含今日,否则到昨日
end = today_local
r_today = DailyRevenue.query.filter_by(date=today_local).first()
cfg = load_config()
ct = cfg.get("cutoff_time")
ch = cfg.get("cutoff_hour", 23)
cm = 0
if isinstance(ct, str) and re.match(r'^\d{1,2}:\d{2}$', ct):
try:
p = ct.split(':')
ch = int(p[0]); cm = int(p[1])
except Exception:
pass
try:
ch = int(ch)
except Exception:
ch = 23
if ch < 0 or ch > 23:
ch = 23
if cm < 0 or cm > 59:
cm = 0
show_today = (now_local.hour > ch) or (now_local.hour == ch and now_local.minute >= cm)
if not (show_today and r_today and r_today.is_final):
end = today_local - timedelta(days=1)
start = end - timedelta(days=days-1)
series = []
cur = start
while cur <= end:
r = DailyRevenue.query.filter_by(date=cur).first()
if r and r.is_final:
amt = r.amount
est = False
else:
amt = gen_amount_for_date(cur, cfg)
est = True
series.append({"date": cur.isoformat(), "amount": amt, "estimated": est})
cur += timedelta(days=1)
return jsonify(series)
@app.route('/api/admin/reload_cutoff', methods=['POST'])
def admin_reload_cutoff():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
cfg = load_config()
ct = cfg.get("cutoff_time")
ch = cfg.get("cutoff_hour", 23)
cm = 0
if isinstance(ct, str) and re.match(r'^\d{1,2}:\d{2}$', ct):
try:
p = ct.split(':')
ch = int(p[0]); cm = int(p[1])
except Exception:
pass
try:
ch = int(ch)
except Exception:
ch = 23
if ch < 0 or ch > 23:
ch = 23
if cm < 0 or cm > 59:
cm = 0
global scheduler
if scheduler:
try:
scheduler.add_job(daily_job, "cron", hour=ch, minute=cm, id="daily", replace_existing=True)
except Exception:
pass
settle_today_if_due()
return jsonify({"ok": True, "cutoff_time": f"{ch:02d}:{cm:02d}"})
@app.route("/api/revenue")
def api_revenue():
"""查询历史营业额"""
days = int(request.args.get("days", 30))
start = datetime.utcnow().date() - timedelta(days=days)
rows = DailyRevenue.query.filter(DailyRevenue.date >= start).order_by(DailyRevenue.date.desc()).all()
return jsonify([{"date": r.date.isoformat(), "amount": r.amount} for r in rows])
@app.route("/api/audit")
def api_audit():
limit = int(request.args.get("limit", 20))
rows = AuditLog.query.order_by(AuditLog.created_at.desc()).limit(limit).all()
return jsonify([
{
"date": r.date.isoformat(),
"old_amount": r.old_amount,
"new_amount": r.new_amount,
"reason": r.reason,
"actor": r.actor,
"type": r.type,
"created_at": r.created_at.isoformat(timespec='seconds')
} for r in rows
])
@app.route("/api/health")
def api_health():
cfg = load_config()
cutoff = int(cfg.get("cutoff_hour", 23))
today_local = datetime.now().date()
r_today = DailyRevenue.query.filter_by(date=today_local).first()
return jsonify({
"server_now": datetime.now().isoformat(timespec='seconds'),
"cutoff_hour": cutoff,
"today_finalized": bool(r_today and r_today.is_final),
"today_amount": (r_today.amount if r_today else None)
})
@app.route("/api/export")
def api_export():
"""CSV导出"""
rows = DailyRevenue.query.order_by(DailyRevenue.date.desc()).all()
csv = "date,amount\n" + "\n".join([f"{r.date},{r.amount}" for r in rows if r.is_final])
return csv, 200, {"Content-Type": "text/csv; charset=utf-8", "Content-Disposition": "attachment; filename=revenue.csv"}
@app.route("/")
def index():
"""SPA入口"""
return send_from_directory(app.static_folder, "index.html")
@app.route("/admin")
def admin_page():
return send_from_directory(app.static_folder, "admin.html")
@app.route('/api/admin/turnover', methods=['PUT'])
def admin_correct():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
payload = request.get_json(force=True)
d = datetime.strptime(payload['date'], '%Y-%m-%d').date()
new_amt = float(payload['amount'])
reason = payload.get('reason', 'manual_correct')
actor = payload.get('actor', 'admin')
r = DailyRevenue.query.filter_by(date=d).first()
old = r.amount if r else None
if r:
r.amount = new_amt
r.is_final = True
r.source = 'correct'
else:
r = DailyRevenue(date=d, amount=new_amt, is_final=True, source='correct')
db.session.add(r)
db.session.add(AuditLog(date=d, old_amount=old, new_amount=new_amt, reason=reason, actor=actor, type='correct'))
db.session.commit()
_append_log_line(d.isoformat(), new_amt, load_config().get('shop_name', '益选便利店'))
push_feishu(d.isoformat(), new_amt, reason)
return jsonify({"ok": True})
@app.route('/api/admin/test_push', methods=['POST'])
def admin_test_push():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
payload = request.get_json(silent=True) or {}
ds = payload.get('date') or datetime.now().date().isoformat()
amt = float(payload.get('amount') or 1234.56)
reason = payload.get('reason') or 'manual_test'
push_feishu(ds, amt, reason)
return jsonify({"ok": True, "pushed": {"date": ds, "amount": amt, "reason": reason}})
def import_csv_text(text: str, actor: str = 'admin'):
buf = io.StringIO(text)
reader = csv.reader(buf)
imported = 0
now = datetime.now().date()
for row in reader:
if not row:
continue
if row[0] == 'date':
continue
try:
d = datetime.strptime(row[0].strip(), '%Y-%m-%d').date()
if d > now:
continue
amt = float(row[1].strip())
except Exception:
continue
r = DailyRevenue.query.filter_by(date=d).first()
old = r.amount if r else None
if r:
r.amount = amt
r.is_final = True
r.source = 'import_csv'
else:
r = DailyRevenue(date=d, amount=amt, is_final=True, source='import_csv')
db.session.add(r)
db.session.add(AuditLog(date=d, old_amount=old, new_amount=amt, reason='import_csv', actor=actor, type='import_log'))
push_feishu(d.isoformat(), amt, 'import_csv')
imported += 1
db.session.commit()
return imported
@app.route('/api/admin/import', methods=['POST'])
def admin_import():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
raw = request.get_data(as_text=True)
if not raw:
return jsonify({"error": "empty"}), 400
imported = import_csv_text(raw, actor='admin')
return jsonify({"ok": True, "imported": imported})
@app.route('/api/admin/logs', methods=['GET'])
def admin_logs():
token = os.getenv('ADMIN_TOKEN')
if token and request.headers.get('X-Admin-Token') != token:
return jsonify({"error": "unauthorized"}), 401
n = request.args.get('lines', default=200, type=int)
p = os.path.join(os.path.dirname(__file__), "..", "app.log")
if not os.path.exists(p):
return jsonify({"lines": []})
try:
with open(p, 'r', encoding='utf-8') as f:
lines = f.readlines()
tail = lines[-n:] if n > 0 else lines
return jsonify({"lines": [line.rstrip('\n') for line in tail]})
except Exception:
return jsonify({"lines": []})
def auto_import_csv_on_start():
with app.app_context():
flag = os.getenv('AUTO_IMPORT_ON_START', '1')
if str(flag) == '0':
return
p = os.path.join("/app", "data", "import.csv")
if os.path.exists(p):
with open(p, "r", encoding="utf-8") as f:
text = f.read()
import_csv_text(text, actor='bootstrap')
def sync_log_to_db():
"""启动时将 app.log 中缺失的数据同步到 DB只同步今天之前"""
log_map = parse_app_log()
local_tz = get_localzone()
today_local = datetime.now(local_tz).date()
for ds, amt in log_map.items():
d = datetime.strptime(ds, '%Y-%m-%d').date()
if d >= today_local:
continue
r = DailyRevenue.query.filter_by(date=d).first()
if not r:
db.session.add(DailyRevenue(date=d, amount=amt, is_final=True, source='import_log'))
db.session.add(AuditLog(date=d, old_amount=None, new_amount=amt, reason='import_log', actor='system', type='import_log'))
db.session.commit()
if __name__ == "__main__":
local_tz = get_localzone()
scheduler = BackgroundScheduler(timezone=local_tz)
cfg = load_config()
ct = cfg.get("cutoff_time")
ch = cfg.get("cutoff_hour", 23)
cm = 0
if isinstance(ct, str) and re.match(r'^\d{1,2}:\d{2}$', ct):
try:
p = ct.split(':')
ch = int(p[0]); cm = int(p[1])
except Exception:
pass
try:
ch = int(ch)
except Exception:
ch = 23
if ch < 0 or ch > 23:
ch = 23
if cm < 0 or cm > 59:
cm = 0
scheduler.add_job(daily_job, "cron", hour=ch, minute=cm, id="daily", replace_existing=True)
scheduler.start()
with app.app_context():
sync_log_to_db()
auto_import_csv_on_start()
settle_past_days()
settle_today_if_due()
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "5000")))
@app.route('/api/events')
def sse_events():
def event_stream():
while True:
local_tz = get_localzone()
now = datetime.now(local_tz)
payload = {"type": "tick", "server_now": now.isoformat(timespec='seconds')}
yield f"data: {json.dumps(payload)}\n\n"
if now.minute in (0, 1):
settle_today_if_due()
yield "data: {\"type\": \"force_refresh\"}\n\n"
time.sleep(30)
return Response(stream_with_context(event_stream()), mimetype='text/event-stream')
def push_feishu(date_str: str, amount: float, reason: str):
cfg = load_config()
url = cfg.get("feishu_webhook_url")
if not url:
return
shop = cfg.get("shop_name", "益选便利店")
try:
d = datetime.strptime(date_str, '%Y-%m-%d').date()
except Exception:
d = datetime.now().date()
y = d - timedelta(days=1)
r = DailyRevenue.query.filter_by(date=y).first()
y_amt = (r.amount if (r and r.is_final) else None)
arrow = ''
diff_str = ''
pct_str = ''
if isinstance(y_amt, (int, float)):
diff = amount - y_amt
arrow = '🔺' if diff >= 0 else '🔻'
diff_str = f"{'+' if diff >= 0 else '-'}{abs(diff):.2f}"
if y_amt != 0:
pct = abs(diff / y_amt) * 100.0
pct_str = f"({'+' if diff >= 0 else '-'}{pct:.2f}%)"
today_line = f"**今日**:¥{amount:.2f}"
if isinstance(y_amt, (int, float)):
today_line += f" {arrow} {diff_str} {pct_str}".strip()
y_line = f"**昨日**{('暂无数据' if y_amt is None else '¥' + format(y_amt, '.2f'))}"
card = {
"config": {"wide_screen_mode": True},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": f"📊 **{shop}** 营业额通知"}},
{"tag": "hr"},
{"tag": "div", "text": {"tag": "lark_md", "content": f"**日期**{date_str}"}},
{"tag": "div", "text": {"tag": "lark_md", "content": today_line}},
{"tag": "div", "text": {"tag": "lark_md", "content": y_line}},
{"tag": "note", "elements": [
{"tag": "plain_text", "content": f"原因:{reason} | 时间:{datetime.now().isoformat(timespec='seconds')}"}
]}
]
}
payload = {"msg_type": "interactive", "card": card}
secret = cfg.get("feishu_secret")
if secret:
ts = str(int(time.time()))
sign_src = ts + "\n" + secret
sign = base64.b64encode(hmac.new(secret.encode(), sign_src.encode(), digestmod=hashlib.sha256).digest()).decode()
payload.update({"timestamp": ts, "sign": sign})
def _log(s: str):
p = os.path.join(os.path.dirname(__file__), "..", "app.log")
with open(p, 'a', encoding='utf-8') as f:
f.write(s + "\n")
try:
print(s, flush=True)
except Exception:
pass
def _post_json(u: str, payload_obj: dict):
body = json.dumps(payload_obj, ensure_ascii=False).encode('utf-8')
return requests.post(u, data=body, headers={'Content-Type': 'application/json; charset=utf-8'}, timeout=5)
try:
is_feishu = ('open.feishu.cn' in url)
ok = False
if is_feishu:
resp = _post_json(url, payload)
ok = (200 <= resp.status_code < 300)
_log(f"飞书推送卡片{'成功' if ok else '失败'}: status={resp.status_code} body={resp.text[:500]}")
if not ok:
post_payload = {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": f"{shop} 营业额通知",
"content": [[
{"tag":"text","text": f"日期:{date_str}\n"},
{"tag":"text","text": f"今日:¥{amount:.2f}"},
*( [{"tag":"text","text": f" {arrow} {diff_str} {pct_str}"}] if isinstance(y_amt,(int,float)) else [] ),
],[
{"tag":"text","text": f"原因:{reason}"}
]]
}
}
}
}
resp_post = _post_json(url, post_payload)
ok = (200 <= resp_post.status_code < 300)
_log(f"飞书推送POST{'成功' if ok else '失败'}: status={resp_post.status_code} body={resp_post.text[:500]}")
if not ok:
text = f"{shop}\n日期:{date_str}\n今日:¥{amount:.2f}"
if isinstance(y_amt, (int, float)):
text += f" {arrow} {diff_str} {pct_str}".strip()
text += f"\n原因:{reason}"
resp2 = _post_json(url, {"msg_type":"text","content":{"text": text}})
_log(f"飞书推送文本{'成功' if (200 <= resp2.status_code < 300) else '失败'}: status={resp2.status_code} body={resp2.text[:500]}")
except Exception as e:
_log(f"飞书推送异常: {str(e)[:200]}")