Files
SaleShow/app.py
T

420 lines
16 KiB
Python

from flask import Flask, render_template, request, jsonify, send_from_directory
import pandas as pd
import os
from werkzeug.utils import secure_filename
import json
from datetime import datetime
import glob
import time
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# 确保上传文件夹存在
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
ALLOWED_EXTENSIONS = {'xlsx', 'xls'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
return render_template('index.html')
@app.route('/files')
def get_files():
"""获取已上传的文件列表"""
try:
upload_folder = app.config['UPLOAD_FOLDER']
if not os.path.exists(upload_folder):
return jsonify({'files': []})
files = []
# 使用glob获取所有Excel文件
all_files = glob.glob(os.path.join(upload_folder, '*.xlsx')) + glob.glob(os.path.join(upload_folder, '*.xls'))
for filepath in all_files:
if not os.path.isfile(filepath):
continue
filename = os.path.basename(filepath)
# 解析时间戳和原始文件名
try:
# 尝试解析格式: YYYYMMDD_HHMMSS_original_filename
parts = filename.split('_', 2)
if len(parts) >= 3:
timestamp_str = parts[0] + parts[1]
upload_time = datetime.strptime(timestamp_str, '%Y%m%d%H%M%S')
original_name = parts[2]
else:
upload_time = datetime.fromtimestamp(os.path.getmtime(filepath))
original_name = filename
except:
upload_time = datetime.fromtimestamp(os.path.getmtime(filepath))
original_name = filename
file_size = os.path.getsize(filepath)
files.append({
'filename': filename,
'original_name': original_name,
'upload_time': upload_time.strftime('%Y-%m-%d %H:%M:%S'),
'file_size': file_size,
'file_size_human': f"{file_size / 1024:.1f} KB" if file_size < 1024*1024 else f"{file_size / (1024*1024):.1f} MB"
})
files.sort(key=lambda x: x['upload_time'], reverse=True)
return jsonify({'files': files})
except Exception as e:
return jsonify({'error': f'获取文件列表失败: {str(e)}'}), 500
@app.route('/load/<filename>')
def load_file(filename):
"""加载指定的文件"""
try:
upload_folder = app.config['UPLOAD_FOLDER']
filepath = os.path.join(upload_folder, filename)
if not os.path.exists(filepath):
return jsonify({'error': '文件不存在'}), 404
if not allowed_file(filename):
return jsonify({'error': '不支持的文件格式'}), 400
# 动态查找表头
header_row = find_header_row(filepath)
df = pd.read_excel(filepath, header=header_row)
sales_data = process_sales_data(df)
return jsonify({
'success': True,
'filename': filename,
'data': sales_data
})
except Exception as e:
return jsonify({'error': f'文件加载错误: {str(e)}'}), 500
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': '没有选择文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_')
filename = timestamp + filename
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
try:
# 动态查找表头
header_row = find_header_row(filepath)
df = pd.read_excel(filepath, header=header_row)
sales_data = process_sales_data(df)
return jsonify({
'success': True,
'filename': filename,
'data': sales_data
})
except Exception as e:
return jsonify({'error': f'文件处理错误: {str(e)}'}), 500
return jsonify({'error': '不支持的文件格式'}), 400
@app.route('/delete/<filename>', methods=['POST'])
def delete_file(filename):
"""删除指定的单个文件"""
try:
filename = secure_filename(filename)
upload_folder = app.config['UPLOAD_FOLDER']
filepath = os.path.join(upload_folder, filename)
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({
'success': True,
'message': f'文件 {filename} 已成功删除'
})
else:
return jsonify({'error': '文件不存在'}), 404
except Exception as e:
return jsonify({'error': f'删除文件失败: {str(e)}'}), 500
@app.route('/cleanup', methods=['POST'])
def cleanup_files():
"""清理上传的文件(立即清理)"""
try:
upload_folder = app.config['UPLOAD_FOLDER']
if not os.path.exists(upload_folder):
return jsonify({'success': True, 'message': '无需清理'})
files = os.listdir(upload_folder)
deleted_count = 0
errors = []
for filename in files:
file_path = os.path.join(upload_folder, filename)
if os.path.isfile(file_path): # Remove extension check to clean everything
try:
os.remove(file_path)
deleted_count += 1
except PermissionError:
errors.append(f"{filename} 正在被占用,无法删除")
except Exception as e:
errors.append(f"{filename} 删除失败: {str(e)}")
message = f'成功清理 {deleted_count} 个文件'
if errors:
message += f'。主要错误: {"; ".join(errors[:3])}'
if len(errors) > 3:
message += f" 等共{len(errors)}个错误"
return jsonify({
'success': True,
'message': message,
'deleted_count': deleted_count
})
except Exception as e:
return jsonify({'error': f'清理文件失败: {str(e)}'}), 500
def find_header_row(filepath):
"""查找表头所在的行索引"""
try:
# 只读取前20行,不带表头
df_temp = pd.read_excel(filepath, header=None, nrows=20)
keywords = ['时间', '日期', '商品', '品名', '数量', '金额', '总价', 'Date', 'Product', 'Qty', 'Amount']
for index, row in df_temp.iterrows():
# 将行转换为字符串列表
row_str = " ".join([str(val) for val in row.values])
# 统计包含的关键字数量
match_count = sum(1 for keyword in keywords if keyword in row_str)
# 如果一行包含至少2个关键字,认为是表头
if match_count >= 2:
return index
return 0 # 默认第一行
except:
return 0
def process_sales_data(df):
"""处理销售数据,使用Pandas向量化操作"""
try:
# 1. 智能识别列名
cols = df.columns.tolist()
col_map = {}
# 优先级关键字
keywords = {
'date': ['时间', '日期', 'Time', 'Date'],
'product': ['商品', '品名', '详情', 'Product', 'Name', 'Description'],
'quantity': ['数量', '件数', 'Quantity', 'Qty', 'Count'],
'amount': ['金额', '总价', 'Amount', 'Price', 'Total'],
'code': ['编码', '货号', '代码', 'Code', 'No']
}
for key, priority_words in keywords.items():
for word in priority_words:
found = False
for col in cols:
if word in str(col):
col_map[key] = col
found = True
break # Found highest priority match
if found: break
# 兼容旧逻辑的后备方案:按索引
if 'date' not in col_map and len(cols) > 1: col_map['date'] = cols[1]
if 'product' not in col_map and len(cols) > 2: col_map['product'] = cols[2]
if 'quantity' not in col_map and len(cols) > 3: col_map['quantity'] = cols[3]
if 'amount' not in col_map and len(cols) > 4: col_map['amount'] = cols[4]
if 'code' not in col_map and len(cols) > 0: col_map['code'] = cols[0]
if not all(k in col_map for k in ['date', 'product', 'quantity', 'amount']):
# 尝试更宽迷糊的匹配
if len(cols) >= 5:
col_map['code'] = cols[0]
col_map['date'] = cols[1]
col_map['product'] = cols[2]
col_map['quantity'] = cols[3]
col_map['amount'] = cols[4]
else:
raise Exception("无法识别必要的列(时间、商品、数量、金额),请检查Excel格式或列名")
# 2. 重命名列以便处理
rename_dict = {
col_map['date']: 'date',
col_map['product']: 'product',
col_map['quantity']: 'quantity',
col_map['amount']: 'amount'
}
if 'code' in col_map:
rename_dict[col_map['code']] = 'code'
df = df.rename(columns=rename_dict)
# 确保code列存在
if 'code' not in df.columns:
df['code'] = ''
# 3. 数据清洗和类型转换
for col in ['quantity', 'amount']:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# 4. 日期处理
df['parsed_date'] = pd.to_datetime(df['date'], errors='coerce')
# 5. 基于"ID块"的解析逻辑 (State Machine)
processed_data = []
daily_summary = {}
def get_day_entry(date_str):
if date_str not in daily_summary:
daily_summary[date_str] = {
'date': date_str,
'total_quantity': 0,
'total_amount': 0,
'products': [],
'summary_info': None
}
return daily_summary[date_str]
# 状态变量
current_context = {
'date_str': None,
'code': None,
'header_quantity': 0, # Header行可能包含的总数(参考用)
'header_amount': 0
}
for index, row in df.iterrows():
# 检查当前行是否有ID (Code)
has_code = pd.notna(row['code']) and str(row['code']).strip() != ''
# 检查当前行是否有商品名
has_product = pd.notna(row['product']) and str(row['product']).strip() != ''
# --- 状态更新逻辑 ---
if has_code:
# 这是一个新的"块"的开始 (Header行)
if pd.notna(row['parsed_date']):
current_context['date_str'] = row['parsed_date'].strftime('%Y-%m-%d %H:%M:%S')
else:
# 如果有Code但没日期,可能沿用上一个?或者这还是同一个块?
# 假设Code行必须有日期,如果没有,可能是数据问题,这里保留上一个日期比较安全
pass
current_context['code'] = str(row['code'])
current_context['header_quantity'] = float(row['quantity'])
current_context['header_amount'] = float(row['amount'])
# 如果这个Header行本身没有任何明细行为(Quantity/Amount都在Header上),
# 层级表里,Header只有总汇。扁平表里,Header也是明细。
# 通过 has_product 区分:
# 扁平表Row: Code(Yes) + Product(Yes)
# 层级表Header: Code(Yes) + Product(No)
# --- 记录处理逻辑 ---
# 如果没有有效日期上下文,无法归档,跳过
if not current_context['date_str']:
continue
date_str = current_context['date_str']
code = current_context['code']
if has_product:
# -> 明细记录 (可能是扁平表的当前行,或者是层级表的子行)
product_name = str(row['product']).strip()
quantity = float(row['quantity'])
amount = float(row['amount'])
# 计算单价
price = amount / quantity if quantity > 0 else 0
# 更新统计
entry = get_day_entry(date_str)
entry['total_quantity'] += quantity
entry['total_amount'] += amount
product_info = {
'product': product_name,
'quantity': quantity,
'amount': amount,
'price': price,
'code': code
}
entry['products'].append(product_info)
processed_data.append({
'date': date_str,
'product': product_name,
'quantity': quantity,
'amount': amount,
'price': price,
'is_summary': False,
'code': code
})
elif has_code and not has_product:
# -> 纯Header行 (层级表结构)
# 我们记录它的汇总信息作为参考,但不计入 daily_summary 的累加(除非完全没有明细)
# 但为了显示在列表中(如果需要显示总汇行),我们可以加一个特殊条目
# 只有当包含数值时才记录
if row['quantity'] > 0 or row['amount'] > 0:
entry = get_day_entry(date_str)
entry['summary_info'] = {
'total_quantity': float(row['quantity']),
'total_amount': float(row['amount']),
'code': code
}
processed_data.append({
'date': date_str,
'product': '【时间段总计】',
'quantity': float(row['quantity']),
'amount': float(row['amount']),
'is_summary': True,
'code': code
})
# 后处理:如果某些天完全没有明细行,但有summary_info,则使用summary_info填充total
for date_str, entry in daily_summary.items():
if entry['total_quantity'] == 0 and entry['total_amount'] == 0 and entry['summary_info']:
entry['total_quantity'] = entry['summary_info']['total_quantity']
entry['total_amount'] = entry['summary_info']['total_amount']
return {
'columns': {
'code': col_map.get('code', ''),
'date': col_map['date'],
'product': col_map['product'],
'quantity': col_map['quantity'],
'amount': col_map['amount']
},
'raw_data': processed_data,
'daily_summary': list(daily_summary.values())
}
except Exception as e:
import traceback
traceback.print_exc()
raise Exception(f"数据处理失败: {str(e)}")
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
# 生产环境建议关闭 debug
debug_mode = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
app.run(debug=debug_mode, host='0.0.0.0', port=port)