588 lines
21 KiB
Python
588 lines
21 KiB
Python
from flask import Flask, render_template, request, jsonify, send_from_directory
|
|
import pandas as pd
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
import json
|
|
from datetime import datetime
|
|
import glob
|
|
import time
|
|
import asyncio
|
|
import threading
|
|
import logging
|
|
|
|
from config import Config
|
|
from automation.uploader import import_excel_file, cleanup_download
|
|
from automation.scheduler import init_scheduler, get_scheduler_status, shutdown_scheduler
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
app.config['UPLOAD_FOLDER'] = 'uploads'
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
|
|
|
|
# 确保上传文件夹存在
|
|
if not os.path.exists(app.config['UPLOAD_FOLDER']):
|
|
os.makedirs(app.config['UPLOAD_FOLDER'])
|
|
|
|
ALLOWED_EXTENSIONS = {'xlsx', 'xls'}
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/files')
|
|
def get_files():
|
|
"""获取已上传的文件列表"""
|
|
try:
|
|
upload_folder = app.config['UPLOAD_FOLDER']
|
|
if not os.path.exists(upload_folder):
|
|
return jsonify({'files': []})
|
|
|
|
files = []
|
|
# 使用glob获取所有Excel文件
|
|
all_files = glob.glob(os.path.join(upload_folder, '*.xlsx')) + glob.glob(os.path.join(upload_folder, '*.xls'))
|
|
|
|
for filepath in all_files:
|
|
if not os.path.isfile(filepath):
|
|
continue
|
|
|
|
filename = os.path.basename(filepath)
|
|
# 解析时间戳和原始文件名
|
|
try:
|
|
# 尝试解析格式: YYYYMMDD_HHMMSS_original_filename
|
|
parts = filename.split('_', 2)
|
|
if len(parts) >= 3:
|
|
timestamp_str = parts[0] + parts[1]
|
|
upload_time = datetime.strptime(timestamp_str, '%Y%m%d%H%M%S')
|
|
original_name = parts[2]
|
|
else:
|
|
upload_time = datetime.fromtimestamp(os.path.getmtime(filepath))
|
|
original_name = filename
|
|
except:
|
|
upload_time = datetime.fromtimestamp(os.path.getmtime(filepath))
|
|
original_name = filename
|
|
|
|
file_size = os.path.getsize(filepath)
|
|
|
|
files.append({
|
|
'filename': filename,
|
|
'original_name': original_name,
|
|
'upload_time': upload_time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'file_size': file_size,
|
|
'file_size_human': f"{file_size / 1024:.1f} KB" if file_size < 1024*1024 else f"{file_size / (1024*1024):.1f} MB"
|
|
})
|
|
|
|
files.sort(key=lambda x: x['upload_time'], reverse=True)
|
|
return jsonify({'files': files})
|
|
except Exception as e:
|
|
return jsonify({'error': f'获取文件列表失败: {str(e)}'}), 500
|
|
|
|
@app.route('/load/<filename>')
|
|
def load_file(filename):
|
|
"""加载指定的文件"""
|
|
try:
|
|
upload_folder = app.config['UPLOAD_FOLDER']
|
|
filepath = os.path.join(upload_folder, filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
return jsonify({'error': '文件不存在'}), 404
|
|
|
|
if not allowed_file(filename):
|
|
return jsonify({'error': '不支持的文件格式'}), 400
|
|
|
|
# 动态查找表头
|
|
header_row = find_header_row(filepath)
|
|
df = pd.read_excel(filepath, header=header_row)
|
|
sales_data = process_sales_data(df)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'filename': filename,
|
|
'data': sales_data
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': f'文件加载错误: {str(e)}'}), 500
|
|
|
|
@app.route('/upload', methods=['POST'])
|
|
def upload_file():
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': '没有选择文件'}), 400
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({'error': '没有选择文件'}), 400
|
|
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_')
|
|
filename = timestamp + filename
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(filepath)
|
|
|
|
try:
|
|
# 动态查找表头
|
|
header_row = find_header_row(filepath)
|
|
df = pd.read_excel(filepath, header=header_row)
|
|
sales_data = process_sales_data(df)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'filename': filename,
|
|
'data': sales_data
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': f'文件处理错误: {str(e)}'}), 500
|
|
|
|
return jsonify({'error': '不支持的文件格式'}), 400
|
|
|
|
@app.route('/delete/<filename>', methods=['POST'])
|
|
def delete_file(filename):
|
|
"""删除指定的单个文件"""
|
|
try:
|
|
filename = secure_filename(filename)
|
|
upload_folder = app.config['UPLOAD_FOLDER']
|
|
filepath = os.path.join(upload_folder, filename)
|
|
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'文件 {filename} 已成功删除'
|
|
})
|
|
else:
|
|
return jsonify({'error': '文件不存在'}), 404
|
|
except Exception as e:
|
|
return jsonify({'error': f'删除文件失败: {str(e)}'}), 500
|
|
|
|
@app.route('/cleanup', methods=['POST'])
|
|
def cleanup_files():
|
|
"""清理上传的文件(立即清理)"""
|
|
try:
|
|
upload_folder = app.config['UPLOAD_FOLDER']
|
|
if not os.path.exists(upload_folder):
|
|
return jsonify({'success': True, 'message': '无需清理'})
|
|
|
|
files = os.listdir(upload_folder)
|
|
deleted_count = 0
|
|
errors = []
|
|
|
|
for filename in files:
|
|
file_path = os.path.join(upload_folder, filename)
|
|
if os.path.isfile(file_path): # Remove extension check to clean everything
|
|
try:
|
|
os.remove(file_path)
|
|
deleted_count += 1
|
|
except PermissionError:
|
|
errors.append(f"{filename} 正在被占用,无法删除")
|
|
except Exception as e:
|
|
errors.append(f"{filename} 删除失败: {str(e)}")
|
|
|
|
message = f'成功清理 {deleted_count} 个文件'
|
|
if errors:
|
|
message += f'。主要错误: {"; ".join(errors[:3])}'
|
|
if len(errors) > 3:
|
|
message += f" 等共{len(errors)}个错误"
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': message,
|
|
'deleted_count': deleted_count
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'清理文件失败: {str(e)}'}), 500
|
|
|
|
# ============ 自动化相关路由 ============
|
|
|
|
# 全局下载任务状态
|
|
download_status = {
|
|
'running': False,
|
|
'message': '',
|
|
'last_run': None,
|
|
'last_file': None
|
|
}
|
|
|
|
|
|
@app.route('/settings')
|
|
def settings_page():
|
|
"""设置页面"""
|
|
return render_template('settings.html')
|
|
|
|
|
|
@app.route('/api/settings', methods=['GET'])
|
|
def get_settings():
|
|
"""获取配置"""
|
|
try:
|
|
return jsonify({'success': True, 'data': Config.get_all_config()})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/settings', methods=['POST'])
|
|
def save_settings():
|
|
"""保存配置"""
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# 保存凭据
|
|
if 'secsion' in data:
|
|
secsion = data['secsion']
|
|
username = secsion.get('username', '').strip()
|
|
password = secsion.get('password', '').strip()
|
|
shop_id = secsion.get('shop_id', '').strip()
|
|
if username and password and password != '******':
|
|
Config.save_secsion_credentials(username, password)
|
|
if shop_id is not None:
|
|
Config.save_shop_id(shop_id)
|
|
|
|
# 保存调度配置
|
|
if 'scheduler' in data:
|
|
sched = data['scheduler']
|
|
Config.save_schedule_config(
|
|
enabled=sched.get('enabled', True),
|
|
hour=sched.get('hour', 1),
|
|
minute=sched.get('minute', 0)
|
|
)
|
|
|
|
return jsonify({'success': True, 'message': '配置已保存'})
|
|
except Exception as e:
|
|
return jsonify({'error': f'保存配置失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/auto-download', methods=['POST'])
|
|
def auto_download():
|
|
"""触发自动下载"""
|
|
global download_status
|
|
|
|
if download_status['running']:
|
|
return jsonify({'error': '已有下载任务正在执行,请稍候'}), 409
|
|
|
|
try:
|
|
data = request.get_json() or {}
|
|
start_date = data.get('start_date')
|
|
end_date = data.get('end_date', start_date)
|
|
|
|
if not start_date:
|
|
from datetime import timedelta
|
|
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
|
start_date = yesterday
|
|
end_date = yesterday
|
|
|
|
# 检查凭据
|
|
creds = Config.get_secsion_credentials()
|
|
if not creds:
|
|
return jsonify({'error': '未配置 secsion.com 登录凭据,请先在设置页面配置'}), 400
|
|
|
|
username, password = creds
|
|
|
|
# 在后台线程执行下载
|
|
def run_download():
|
|
global download_status
|
|
download_status['running'] = True
|
|
download_status['message'] = f'正在下载 {start_date} ~ {end_date} 的数据...'
|
|
|
|
try:
|
|
from automation.secsion import SecsionDownloader
|
|
|
|
shop_id = Config.get_shop_id()
|
|
downloader = SecsionDownloader(username, password, download_dir='downloads', shop_id=shop_id)
|
|
file_path = asyncio.run(downloader.download_report(start_date, end_date))
|
|
|
|
if file_path:
|
|
imported_name = import_excel_file(file_path, upload_dir='uploads')
|
|
cleanup_download(file_path)
|
|
|
|
if imported_name:
|
|
download_status['message'] = f'下载完成: {imported_name}'
|
|
download_status['last_file'] = imported_name
|
|
logger.info(f"自动下载并导入成功: {imported_name}")
|
|
else:
|
|
download_status['message'] = '下载成功但导入失败'
|
|
else:
|
|
download_status['message'] = '下载失败:未获取到文件'
|
|
|
|
except Exception as e:
|
|
download_status['message'] = f'下载异常: {str(e)}'
|
|
logger.error(f"自动下载异常: {e}", exc_info=True)
|
|
finally:
|
|
download_status['running'] = False
|
|
download_status['last_run'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
thread = threading.Thread(target=run_download, daemon=True)
|
|
thread.start()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'已开始下载 {start_date} ~ {end_date} 的数据'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'启动下载失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/auto-download/status', methods=['GET'])
|
|
def get_download_status():
|
|
"""获取下载任务状态"""
|
|
return jsonify({
|
|
'success': True,
|
|
'status': download_status
|
|
})
|
|
|
|
|
|
@app.route('/api/scheduler/status', methods=['GET'])
|
|
def get_scheduler():
|
|
"""获取定时任务状态"""
|
|
return jsonify({
|
|
'success': True,
|
|
'status': get_scheduler_status()
|
|
})
|
|
|
|
|
|
# ============ 数据处理函数 ============
|
|
|
|
def find_header_row(filepath):
|
|
"""查找表头所在的行索引"""
|
|
try:
|
|
# 只读取前20行,不带表头
|
|
df_temp = pd.read_excel(filepath, header=None, nrows=20)
|
|
|
|
keywords = ['时间', '日期', '商品', '品名', '数量', '金额', '总价', 'Date', 'Product', 'Qty', 'Amount']
|
|
|
|
for index, row in df_temp.iterrows():
|
|
# 将行转换为字符串列表
|
|
row_str = " ".join([str(val) for val in row.values])
|
|
# 统计包含的关键字数量
|
|
match_count = sum(1 for keyword in keywords if keyword in row_str)
|
|
|
|
# 如果一行包含至少2个关键字,认为是表头
|
|
if match_count >= 2:
|
|
return index
|
|
return 0 # 默认第一行
|
|
except:
|
|
return 0
|
|
|
|
def process_sales_data(df):
|
|
"""处理销售数据,使用Pandas向量化操作"""
|
|
try:
|
|
# 1. 智能识别列名
|
|
cols = df.columns.tolist()
|
|
col_map = {}
|
|
|
|
# 优先级关键字
|
|
keywords = {
|
|
'date': ['时间', '日期', 'Time', 'Date'],
|
|
'product': ['商品', '品名', '详情', 'Product', 'Name', 'Description'],
|
|
'quantity': ['数量', '件数', 'Quantity', 'Qty', 'Count'],
|
|
'amount': ['金额', '总价', 'Amount', 'Price', 'Total'],
|
|
'code': ['编码', '货号', '代码', 'Code', 'No']
|
|
}
|
|
|
|
for key, priority_words in keywords.items():
|
|
for word in priority_words:
|
|
found = False
|
|
for col in cols:
|
|
if word in str(col):
|
|
col_map[key] = col
|
|
found = True
|
|
break # Found highest priority match
|
|
if found: break
|
|
|
|
# 兼容旧逻辑的后备方案:按索引
|
|
if 'date' not in col_map and len(cols) > 1: col_map['date'] = cols[1]
|
|
if 'product' not in col_map and len(cols) > 2: col_map['product'] = cols[2]
|
|
if 'quantity' not in col_map and len(cols) > 3: col_map['quantity'] = cols[3]
|
|
if 'amount' not in col_map and len(cols) > 4: col_map['amount'] = cols[4]
|
|
if 'code' not in col_map and len(cols) > 0: col_map['code'] = cols[0]
|
|
|
|
if not all(k in col_map for k in ['date', 'product', 'quantity', 'amount']):
|
|
# 尝试更宽迷糊的匹配
|
|
if len(cols) >= 5:
|
|
col_map['code'] = cols[0]
|
|
col_map['date'] = cols[1]
|
|
col_map['product'] = cols[2]
|
|
col_map['quantity'] = cols[3]
|
|
col_map['amount'] = cols[4]
|
|
else:
|
|
raise Exception("无法识别必要的列(时间、商品、数量、金额),请检查Excel格式或列名")
|
|
|
|
# 2. 重命名列以便处理
|
|
rename_dict = {
|
|
col_map['date']: 'date',
|
|
col_map['product']: 'product',
|
|
col_map['quantity']: 'quantity',
|
|
col_map['amount']: 'amount'
|
|
}
|
|
if 'code' in col_map:
|
|
rename_dict[col_map['code']] = 'code'
|
|
|
|
df = df.rename(columns=rename_dict)
|
|
|
|
# 确保code列存在
|
|
if 'code' not in df.columns:
|
|
df['code'] = ''
|
|
|
|
# 3. 数据清洗和类型转换
|
|
for col in ['quantity', 'amount']:
|
|
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
|
|
|
|
# 4. 日期处理
|
|
df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce')
|
|
|
|
# 5. 基于"ID块"的解析逻辑 (State Machine)
|
|
|
|
processed_data = []
|
|
daily_summary = {}
|
|
|
|
def get_day_entry(date_str):
|
|
if date_str not in daily_summary:
|
|
daily_summary[date_str] = {
|
|
'date': date_str,
|
|
'total_quantity': 0,
|
|
'total_amount': 0,
|
|
'products': [],
|
|
'summary_info': None
|
|
}
|
|
return daily_summary[date_str]
|
|
|
|
# 状态变量
|
|
current_context = {
|
|
'date_str': None,
|
|
'code': None,
|
|
'header_quantity': 0, # Header行可能包含的总数(参考用)
|
|
'header_amount': 0
|
|
}
|
|
|
|
for index, row in df.iterrows():
|
|
# 检查当前行是否有ID (Code)
|
|
has_code = pd.notna(row['code']) and str(row['code']).strip() != ''
|
|
|
|
# 检查当前行是否有商品名
|
|
has_product = pd.notna(row['product']) and str(row['product']).strip() != ''
|
|
|
|
# --- 状态更新逻辑 ---
|
|
if has_code:
|
|
# 这是一个新的"块"的开始 (Header行)
|
|
if pd.notna(row['parsed_date']):
|
|
current_context['date_str'] = row['parsed_date'].strftime('%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
# 如果有Code但没日期,可能沿用上一个?或者这还是同一个块?
|
|
# 假设Code行必须有日期,如果没有,可能是数据问题,这里保留上一个日期比较安全
|
|
pass
|
|
|
|
current_context['code'] = str(row['code'])
|
|
current_context['header_quantity'] = float(row['quantity'])
|
|
current_context['header_amount'] = float(row['amount'])
|
|
|
|
# 如果这个Header行本身没有任何明细行为(Quantity/Amount都在Header上),
|
|
# 层级表里,Header只有总汇。扁平表里,Header也是明细。
|
|
# 通过 has_product 区分:
|
|
# 扁平表Row: Code(Yes) + Product(Yes)
|
|
# 层级表Header: Code(Yes) + Product(No)
|
|
|
|
# --- 记录处理逻辑 ---
|
|
|
|
# 如果没有有效日期上下文,无法归档,跳过
|
|
if not current_context['date_str']:
|
|
continue
|
|
|
|
date_str = current_context['date_str']
|
|
code = current_context['code']
|
|
|
|
if has_product:
|
|
# -> 明细记录 (可能是扁平表的当前行,或者是层级表的子行)
|
|
product_name = str(row['product']).strip()
|
|
quantity = float(row['quantity'])
|
|
amount = float(row['amount'])
|
|
|
|
# 计算单价
|
|
price = amount / quantity if quantity > 0 else 0
|
|
|
|
# 更新统计
|
|
entry = get_day_entry(date_str)
|
|
entry['total_quantity'] += quantity
|
|
entry['total_amount'] += amount
|
|
|
|
product_info = {
|
|
'product': product_name,
|
|
'quantity': quantity,
|
|
'amount': amount,
|
|
'price': price,
|
|
'code': code
|
|
}
|
|
entry['products'].append(product_info)
|
|
|
|
processed_data.append({
|
|
'date': date_str,
|
|
'product': product_name,
|
|
'quantity': quantity,
|
|
'amount': amount,
|
|
'price': price,
|
|
'is_summary': False,
|
|
'code': code
|
|
})
|
|
|
|
elif has_code and not has_product:
|
|
# -> 纯Header行 (层级表结构)
|
|
# 我们记录它的汇总信息作为参考,但不计入 daily_summary 的累加(除非完全没有明细)
|
|
# 但为了显示在列表中(如果需要显示总汇行),我们可以加一个特殊条目
|
|
|
|
# 只有当包含数值时才记录
|
|
if row['quantity'] > 0 or row['amount'] > 0:
|
|
entry = get_day_entry(date_str)
|
|
entry['summary_info'] = {
|
|
'total_quantity': float(row['quantity']),
|
|
'total_amount': float(row['amount']),
|
|
'code': code
|
|
}
|
|
|
|
processed_data.append({
|
|
'date': date_str,
|
|
'product': '【时间段总计】',
|
|
'quantity': float(row['quantity']),
|
|
'amount': float(row['amount']),
|
|
'is_summary': True,
|
|
'code': code
|
|
})
|
|
|
|
# 后处理:如果某些天完全没有明细行,但有summary_info,则使用summary_info填充total
|
|
for date_str, entry in daily_summary.items():
|
|
if entry['total_quantity'] == 0 and entry['total_amount'] == 0 and entry['summary_info']:
|
|
entry['total_quantity'] = entry['summary_info']['total_quantity']
|
|
entry['total_amount'] = entry['summary_info']['total_amount']
|
|
|
|
return {
|
|
'columns': {
|
|
'code': col_map.get('code', ''),
|
|
'date': col_map['date'],
|
|
'product': col_map['product'],
|
|
'quantity': col_map['quantity'],
|
|
'amount': col_map['amount']
|
|
},
|
|
'raw_data': processed_data,
|
|
'daily_summary': list(daily_summary.values())
|
|
}
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise Exception(f"数据处理失败: {str(e)}")
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get('PORT', 5000))
|
|
# 生产环境建议关闭 debug
|
|
debug_mode = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
|
|
|
|
# 初始化定时任务调度器
|
|
try:
|
|
init_scheduler(app)
|
|
except Exception as e:
|
|
logger.warning(f"定时任务调度器初始化失败: {e}")
|
|
|
|
app.run(debug=debug_mode, host='0.0.0.0', port=port) |