Files
SaleShow/automation/scheduler.py
T

132 lines
3.7 KiB
Python

"""
定时调度模块
使用 APScheduler 实现每日自动下载前一天的销售数据
"""
import asyncio
import logging
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
logger = logging.getLogger(__name__)
scheduler = None
def auto_download_job():
"""
定时任务:自动下载前一天的销售数据
每日凌晨 1 点执行
"""
logger.info("========== 定时任务触发:自动下载前一天数据 ==========")
try:
from config import Config
from automation.secsion import SecsionDownloader
from automation.uploader import import_excel_file, cleanup_download
# 获取凭据
creds = Config.get_secsion_credentials()
if not creds:
logger.error("未配置 secsion.com 登录凭据,跳过自动下载")
return
username, password = creds
# 计算前一天日期
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
logger.info(f"下载日期: {yesterday}")
# 获取店铺 ID
shop_id = Config.get_shop_id()
# 执行下载
downloader = SecsionDownloader(username, password, download_dir='downloads', shop_id=shop_id)
file_path = asyncio.run(downloader.download_report(yesterday, yesterday))
if not file_path:
logger.error("自动下载失败:未获取到文件")
return
# 导入到 SaleShow
imported_name = import_excel_file(file_path, upload_dir='uploads')
if imported_name:
logger.info(f"自动导入成功: {imported_name}")
else:
logger.error("自动导入失败")
# 清理下载的临时文件
cleanup_download(file_path)
logger.info("========== 定时任务完成 ==========")
except Exception as e:
logger.error(f"定时任务执行异常: {e}", exc_info=True)
def init_scheduler(app=None):
"""
初始化定时调度器
Args:
app: Flask app 实例(可选,用于获取配置)
Returns:
BackgroundScheduler: 调度器实例
"""
global scheduler
try:
from config import Config
schedule_config = Config.get_schedule_config()
except Exception:
schedule_config = {'enabled': True, 'hour': 1, 'minute': 0}
if not schedule_config.get('enabled', True):
logger.info("定时任务已禁用")
return None
hour = schedule_config.get('hour', 1)
minute = schedule_config.get('minute', 0)
scheduler = BackgroundScheduler()
scheduler.add_job(
func=auto_download_job,
trigger=CronTrigger(hour=hour, minute=minute),
id='daily_download',
name='每日自动下载销售数据',
replace_existing=True,
misfire_grace_time=3600 # 允许 1 小时的延迟执行
)
scheduler.start()
logger.info(f"定时任务已启动:每日 {hour:02d}:{minute:02d} 自动下载前一天数据")
return scheduler
def get_scheduler_status():
"""获取调度器状态"""
if scheduler is None:
return {'running': False, 'jobs': []}
jobs = []
for job in scheduler.get_jobs():
jobs.append({
'id': job.id,
'name': job.name,
'next_run': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job.next_run_time else None
})
return {
'running': scheduler.running,
'jobs': jobs
}
def shutdown_scheduler():
"""关闭调度器"""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown(wait=False)
logger.info("定时任务已关闭")