From cd1adc56475d0168473688c6afdb6535191bce8f Mon Sep 17 00:00:00 2001 From: houhuan Date: Wed, 25 Mar 2026 19:49:47 +0800 Subject: [PATCH] feat: simplify UI and cleanup code (removed support types, mapping wizard, supplier manager, and validation matching) --- config.ini | 2 +- data/user_settings.json | 3 +- 启动器.py | 1779 --------------------------------------- 3 files changed, 2 insertions(+), 1782 deletions(-) diff --git a/config.ini b/config.ini index bd33df3..428dc0a 100644 --- a/config.ini +++ b/config.ini @@ -27,4 +27,4 @@ max_file_size_mb = 4 purchase_order = 银豹-采购单模板.xls [App] -version = 2025.12.12.1309 +version = 2026.03.25.1945 \ No newline at end of file diff --git a/data/user_settings.json b/data/user_settings.json index 3ad9051..413fe84 100644 --- a/data/user_settings.json +++ b/data/user_settings.json @@ -2,7 +2,6 @@ "window_size": "900x600", "theme_mode": "light", "recent_files": [ - "data/result\\采购单_蓉城易购-订单1765513867817.xls", - "E:\\2025Code\\python\\orc-order-v2\\data\\output\\蓉城易购-订单1765513867817.xlsx" + "data/output\\微信图片_20251115212128_148_108.xlsx" ] } \ No newline at end of file diff --git a/启动器.py b/启动器.py index bc35ea9..1398838 100644 --- a/启动器.py +++ b/启动器.py @@ -2369,227 +2369,6 @@ def main(): - def open_validation_panel(log_widget): - try: - import json - import pandas as pd - import xlrd - dlg = tk.Toplevel() - dlg.title("验证匹配") - dlg.geometry("900x680") - center_window(dlg) - try: - dlg.lift() - dlg.attributes('-topmost', True) - dlg.after(200, lambda: dlg.attributes('-topmost', False)) - dlg.focus_force() - except Exception: - pass - outer = ttk.Frame(dlg) - outer.pack(fill=tk.BOTH, expand=True) - canvas = tk.Canvas(outer) - vsb = ttk.Scrollbar(outer, orient='vertical', command=canvas.yview) - canvas.configure(yscrollcommand=vsb.set) - canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - vsb.pack(side=tk.RIGHT, fill=tk.Y) - frame = ttk.Frame(canvas) - canvas.create_window((0,0), window=frame, anchor='nw') - frame.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) - cfg_path = os.path.join("config","suppliers_config.json") - suppliers = [] - data_cfg = {"suppliers": []} - try: - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data_cfg = json.load(f) - suppliers = [s.get('name','') for s in data_cfg.get('suppliers',[])] - except Exception: - pass - row1 = ttk.Frame(frame) - row1.pack(fill=tk.X, pady=6) - ttk.Label(row1, text="供应商").pack(side=tk.LEFT) - sup_var = tk.StringVar() - ttk.Combobox(row1, textvariable=sup_var, state='readonly', values=suppliers).pack(side=tk.LEFT, padx=6) - row2 = ttk.Frame(frame) - row2.pack(fill=tk.X, pady=6) - ttk.Label(row2, text="原始文件").pack(side=tk.LEFT) - orig_var = tk.StringVar() - ttk.Entry(row2, textvariable=orig_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - ttk.Button(row2, text="浏览", command=lambda: orig_var.set(filedialog.askopenfilename(title="选择原始Excel", filetypes=[("Excel","*.xlsx *.xls")]) or orig_var.get())).pack(side=tk.LEFT) - row3 = ttk.Frame(frame) - row3.pack(fill=tk.X, pady=6) - ttk.Label(row3, text="期望结果").pack(side=tk.LEFT) - expect_var = tk.StringVar() - ttk.Entry(row3, textvariable=expect_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - ttk.Button(row3, text="浏览", command=lambda: expect_var.set(filedialog.askopenfilename(title="选择期望结果", filetypes=[("Excel","*.xls *.xlsx")]) or expect_var.get())).pack(side=tk.LEFT) - act_row = ttk.Frame(frame) - act_row.pack(fill=tk.X, pady=8) - result_text = scrolledtext.ScrolledText(frame, height=6) - result_text.pack(fill=tk.BOTH, expand=True) - diff_box = ttk.Frame(frame) - diff_box.pack(fill=tk.BOTH, expand=True, pady=6) - diff_tree = ttk.Treeview(diff_box, show='headings', height=12) - diff_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - diff_scroll_x = ttk.Scrollbar(frame, orient='horizontal', command=diff_tree.xview) - diff_tree.configure(xscrollcommand=diff_scroll_x.set) - diff_scroll_x.pack(fill=tk.X) - suggestions_cache = [] - def _read_df(path): - ap = os.path.abspath(path) - if ap.lower().endswith('.xlsx'): - return pd.read_excel(ap, engine='openpyxl') - else: - return pd.read_excel(ap, engine='xlrd') - def _run_validation(): - try: - import numpy as np - from app.services.order_service import OrderService - supplier = sup_var.get() - orig = orig_var.get().strip() - expect = expect_var.get().strip() - if not supplier or not orig or not expect: - messagebox.showwarning("提示","请选择供应商、原始文件与期望结果") - return - service = OrderService() - out_path = service.process_excel(orig, progress_cb=lambda p: None) - df_actual = _read_df(out_path) - df_expect = _read_df(expect) - keys = None - if 'barcode' in df_actual.columns and 'barcode' in df_expect.columns: - keys = 'barcode' - elif '条码' in df_actual.columns and '条码' in df_expect.columns: - keys = '条码' - elif 'name' in df_actual.columns and 'name' in df_expect.columns: - keys = 'name' - records = [] - mismatches = 0 - if keys: - a = df_actual.set_index(keys) - e = df_expect.set_index(keys) - idx_union = list(set(a.index.tolist()) | set(e.index.tolist())) - for k in idx_union: - ra = a.loc[k] if k in a.index else None - rexp = e.loc[k] if k in e.index else None - for f in ['barcode','name','specification','quantity','unit','unit_price','total_price']: - va = ra.get(f) if (ra is not None and f in a.columns) else None - ve = rexp.get(f) if (rexp is not None and f in e.columns) else None - if va is None and ve is None: - continue - def _norm(v): - try: - return float(v) - except Exception: - return str(v) if v is not None else '' - if _norm(va) != _norm(ve): - records.append([str(k), f, str(ve), str(va)]) - mismatches += 1 - else: - for i in range(min(len(df_actual), len(df_expect))): - for f in ['barcode','name','specification','quantity','unit','unit_price','total_price']: - va = df_actual.iloc[i].get(f) if f in df_actual.columns else None - ve = df_expect.iloc[i].get(f) if f in df_expect.columns else None - if va is None and ve is None: - continue - def _norm(v): - try: - return float(v) - except Exception: - return str(v) if v is not None else '' - if _norm(va) != _norm(ve): - records.append([str(i), f, str(ve), str(va)]) - mismatches += 1 - diff_tree['columns'] = ['key','field','expected','actual'] - for c in ['key','field','expected','actual']: - diff_tree.heading(c, text=c) - diff_tree.column(c, width=160, stretch=True) - for iid in diff_tree.get_children(): - diff_tree.delete(iid) - for row in records: - diff_tree.insert('', tk.END, values=row) - result_text.configure(state=tk.NORMAL) - result_text.delete(1.0, tk.END) - result_text.insert(tk.END, f"差异条目: {mismatches}\n") - suggestions = [] - try: - unit_vals = df_actual.get('unit') if 'unit' in df_actual.columns else pd.Series([]) - bad_units = unit_vals.astype(str).isin(['箱','件','提','盒']).sum() - if bad_units > 0: - suggestions.append({"type":"normalize_unit","target":"unit","map":{"箱":"件","提":"件","盒":"件"}}) - except Exception: - pass - try: - qty = pd.to_numeric(df_actual.get('quantity'), errors='coerce') if 'quantity' in df_actual.columns else pd.Series([]) - missing_qty = int(qty.isna().sum()) if not qty.empty else 0 - if missing_qty > 0: - cols = df_actual.columns.tolist() - src = None - for cand in ['订单数量','订购量','订货数量']: - if cand in cols: - src = cand - break - if src: - suggestions.append({"type":"split_quantity_unit","source":src}) - except Exception: - pass - try: - spec = df_actual.get('specification') if 'specification' in df_actual.columns else pd.Series([]) - missing_spec = int(spec.isna().sum()) if not spec.empty else 0 - if missing_spec > 0: - suggestions.append({"type":"extract_spec_from_name","source":"name"}) - except Exception: - pass - try: - up = pd.to_numeric(df_actual.get('unit_price'), errors='coerce').fillna(0) if 'unit_price' in df_actual.columns else pd.Series([]) - tp = pd.to_numeric(df_actual.get('total_price'), errors='coerce').fillna(0) if 'total_price' in df_actual.columns else pd.Series([]) - qty = pd.to_numeric(df_actual.get('quantity'), errors='coerce').fillna(np.nan) if 'quantity' in df_actual.columns else pd.Series([]) - if not qty.empty and not up.empty and not tp.empty: - need_compute = ((qty.isna()) & (up > 0) & (tp > 0)).sum() - if need_compute > 0: - suggestions.append({"type":"compute_quantity_from_total"}) - except Exception: - pass - suggestions_cache.clear() - suggestions_cache.extend(suggestions) - if suggestions: - result_text.insert(tk.END, f"建议: {json.dumps(suggestions, ensure_ascii=False)}\n") - result_text.configure(state=tk.DISABLED) - except Exception as e: - messagebox.showerror("验证失败", str(e)) - def _apply_suggestions(): - try: - supplier = sup_var.get() - if not supplier: - return - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - for s in data.get('suppliers',[]): - if s.get('name') == supplier: - rules = s.get('rules', []) - for sug in suggestions_cache: - exists = any(r.get('type') == sug.get('type') for r in rules) - if not exists: - rules.append(sug) - s['rules'] = rules - d = s.get('dictionary') or {} - d.setdefault('unit_synonyms', {"箱":"件","提":"件","盒":"件","瓶":"瓶"}) - d.setdefault('pack_multipliers', {"件":24,"箱":24,"提":12,"盒":10}) - s['dictionary'] = d - break - with open(cfg_path,'w',encoding='utf-8') as f: - json.dump(data,f,ensure_ascii=False,indent=2) - result_text.configure(state=tk.NORMAL) - result_text.insert(tk.END, "已应用建议并保存配置\n") - result_text.configure(state=tk.DISABLED) - else: - messagebox.showwarning("提示","配置文件不存在") - except Exception as e: - messagebox.showerror("应用失败", str(e)) - ttk.Button(act_row, text="运行验证", command=_run_validation).pack(side=tk.LEFT) - ttk.Button(act_row, text="应用建议", command=_apply_suggestions).pack(side=tk.LEFT, padx=6) - ttk.Button(act_row, text="关闭", command=dlg.destroy).pack(side=tk.RIGHT) - except Exception as e: - messagebox.showerror("验证匹配错误", str(e)) # 系统设置区 settings_section = tk.LabelFrame( @@ -2607,11 +2386,7 @@ def main(): settings_buttons_frame.pack(fill=tk.X, padx=8, pady=6) create_modern_button(settings_buttons_frame, "系统设置", lambda: show_config_dialog(root, ConfigManager()), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) create_modern_button(settings_buttons_frame, "条码映射", lambda: edit_barcode_mappings(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) - create_modern_button(settings_buttons_frame, "支持类型", lambda: show_supported_processors(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) - create_modern_button(settings_buttons_frame, "列映射向导", lambda: open_column_mapping_wizard_alt(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) - create_modern_button(settings_buttons_frame, "供应商管理", lambda: open_supplier_manager(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) create_modern_button(settings_buttons_frame, "模板管理", lambda: open_template_manager(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) - create_modern_button(settings_buttons_frame, "验证匹配", lambda: open_validation_panel(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) @@ -2872,1232 +2647,6 @@ def open_result_directory_from_settings(): except Exception as e: messagebox.showerror("错误", f"无法打开结果目录: {str(e)}") -def show_supported_processors(log_widget): - global PROCESSOR_SERVICE - try: - if PROCESSOR_SERVICE is None: - PROCESSOR_SERVICE = ProcessorService(ConfigManager()) - info = PROCESSOR_SERVICE.get_supported_types() - dlg = tk.Toplevel() - dlg.title("支持的处理器类型") - dlg.geometry("420x360") - center_window(dlg) - frame = tk.Frame(dlg) - frame.pack(fill=tk.BOTH, expand=True, padx=12, pady=12) - text = scrolledtext.ScrolledText(frame, wrap=tk.WORD) - text.pack(fill=tk.BOTH, expand=True) - text.configure(state=tk.NORMAL) - text.insert(tk.END, "支持的处理器类型\n\n") - for item in info: - exts = ', '.join(item['extensions']) if item.get('extensions') else '' - text.insert(tk.END, f"• {item['name']}: {item['description']}\n") - text.insert(tk.END, f" 扩展名: {exts}\n\n") - text.configure(state=tk.DISABLED) - tk.Button(dlg, text="关闭", command=dlg.destroy).pack(pady=8) - add_to_log(log_widget, "已显示支持的处理器类型\n", "info") - except Exception as e: - add_to_log(log_widget, f"显示处理器类型出错: {str(e)}\n", "error") - -def safe_open_validation_panel(log_widget): - try: - if 'open_validation_panel' in globals() and callable(globals()['open_validation_panel']): - return globals()['open_validation_panel'](log_widget) - else: - messagebox.showwarning("提示", "验证面板功能尚未加载,请重启程序后重试") - except Exception as e: - messagebox.showerror("错误", str(e)) - -def open_supplier_manager(log_widget): - try: - import json - dlg = tk.Toplevel() - dlg.title("供应商管理") - dlg.geometry("900x700") - center_window(dlg) - try: - dlg.lift() - dlg.attributes('-topmost', True) - dlg.after(200, lambda: dlg.attributes('-topmost', False)) - dlg.focus_force() - except Exception: - pass - cfg_path = os.path.join("config","suppliers_config.json") - data = {"suppliers": []} - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - outer = ttk.Frame(dlg) - outer.pack(fill=tk.BOTH, expand=True) - canvas = tk.Canvas(outer) - vsb = ttk.Scrollbar(outer, orient='vertical', command=canvas.yview) - canvas.configure(yscrollcommand=vsb.set) - canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - vsb.pack(side=tk.RIGHT, fill=tk.Y) - container = ttk.Frame(canvas) - canvas.create_window((0,0), window=container, anchor='nw') - def _on_container_config(evt): - try: - canvas.configure(scrollregion=canvas.bbox("all")) - except Exception: - pass - container.bind("", _on_container_config) - def _on_mousewheel(event): - try: - delta = -1 * int(event.delta / 120) - canvas.yview_scroll(delta, "units") - except Exception: - pass - dlg.bind("", _on_mousewheel) - # inner padding - inner = ttk.Frame(container) - inner.pack(fill=tk.BOTH, expand=True, padx=12, pady=12) - container = inner - container.columnconfigure(0, weight=1) - container.columnconfigure(1, weight=3) - left = ttk.Frame(container) - left.grid(row=0, column=0, sticky='nsew', padx=(0,8)) - right = ttk.Frame(container) - right.grid(row=0, column=1, sticky='nsew') - sup_list = tk.Listbox(left, height=20) - sup_list.pack(fill=tk.BOTH, expand=True) - btns = ttk.Frame(left) - btns.pack(fill=tk.X, pady=6) - def refresh_list(selected_idx=0): - sup_list.delete(0, tk.END) - for s in data.get('suppliers',[]): - sup_list.insert(tk.END, s.get('name','')) - try: - sup_list.select_clear(0, tk.END) - sup_list.select_set(selected_idx) - except Exception: - pass - def new_supplier(): - data.setdefault('suppliers', []).append({ - 'name': '新供应商', - 'description': '', - 'filename_patterns': [], - 'content_indicators': [], - 'column_mapping': {}, - 'header_row': 0, - 'rules': [], - 'dictionary': { - 'ignore_words': [], - 'unit_synonyms': {}, - 'pack_multipliers': {}, - 'name_patterns': [], - 'default_unit': '瓶', - 'default_package_quantity': 1 - }, - 'output_templates': ['templates/银豹-采购单模板.xls'], - 'current_template_index': 0 - }) - refresh_list(len(data['suppliers'])-1) - load_selected() - def copy_supplier(): - sel = sup_list.curselection() - if not sel: - return - idx = sel[0] - import copy - src = data['suppliers'][idx] - cp = copy.deepcopy(src) - cp['name'] = src.get('name','') + '_副本' - data['suppliers'].append(cp) - refresh_list(len(data['suppliers'])-1) - def delete_supplier(): - sel = sup_list.curselection() - if not sel: - return - idx = sel[0] - if 0 <= idx < len(data['suppliers']): - del data['suppliers'][idx] - refresh_list(max(0, idx-1)) - ttk.Button(btns, text="新建", command=new_supplier).pack(side=tk.LEFT) - ttk.Button(btns, text="复制", command=copy_supplier).pack(side=tk.LEFT, padx=6) - ttk.Button(btns, text="删除", command=delete_supplier).pack(side=tk.LEFT) - nb = ttk.Notebook(right) - nb.pack(fill=tk.BOTH, expand=True) - basic_tab = ttk.Frame(nb) - map_tab = ttk.Frame(nb) - rule_tab = ttk.Frame(nb) - tpl_tab = ttk.Frame(nb) - nb.add(basic_tab, text="基本信息") - nb.add(map_tab, text="列映射与表头") - nb.add(rule_tab, text="规则与词典") - nb.add(tpl_tab, text="模板管理") - name_var = tk.StringVar() - desc_var = tk.StringVar() - header_row_var = tk.StringVar() - ttk.Label(basic_tab, text="名称").pack(anchor='w') - ttk.Entry(basic_tab, textvariable=name_var).pack(fill=tk.X, pady=4) - ttk.Label(basic_tab, text="描述").pack(anchor='w') - ttk.Entry(basic_tab, textvariable=desc_var).pack(fill=tk.X, pady=4) - ttk.Label(basic_tab, text="文件名模式(每行一条)").pack(anchor='w') - fp_text = scrolledtext.ScrolledText(basic_tab, height=4) - fp_text.pack(fill=tk.BOTH, pady=4) - ttk.Label(basic_tab, text="内容指示词(每行一条)").pack(anchor='w') - ci_text = scrolledtext.ScrolledText(basic_tab, height=4) - ci_text.pack(fill=tk.BOTH, pady=4) - ttk.Label(basic_tab, text="表头行号(从1开始)").pack(anchor='w') - ttk.Entry(basic_tab, textvariable=header_row_var).pack(fill=tk.X, pady=4) - ttk.Button(map_tab, text="打开列映射向导", command=lambda: open_column_mapping_wizard_alt(log_widget, name_var.get())).pack(anchor='w', pady=6) - ttk.Label(rule_tab, text="忽略词(每行一条)").pack(anchor='w') - ignore_words_text = scrolledtext.ScrolledText(rule_tab, height=4) - ignore_words_text.pack(fill=tk.BOTH, pady=4) - ttk.Label(rule_tab, text="单位同义词(JSON)").pack(anchor='w') - unit_synonyms_var = tk.StringVar(value='{}') - ttk.Entry(rule_tab, textvariable=unit_synonyms_var).pack(fill=tk.X, pady=4) - ttk.Label(rule_tab, text="包装倍数(JSON)").pack(anchor='w') - pack_multipliers_var = tk.StringVar(value='{}') - ttk.Entry(rule_tab, textvariable=pack_multipliers_var).pack(fill=tk.X, pady=4) - ttk.Label(rule_tab, text="名称正则(每行一条)").pack(anchor='w') - name_patterns_text = scrolledtext.ScrolledText(rule_tab, height=4) - name_patterns_text.pack(fill=tk.BOTH, pady=4) - ttk.Label(rule_tab, text="默认单位").pack(anchor='w') - default_unit_var = tk.StringVar(value='瓶') - ttk.Entry(rule_tab, textvariable=default_unit_var).pack(fill=tk.X, pady=4) - ttk.Label(rule_tab, text="默认包装数量").pack(anchor='w') - default_pq_var = tk.StringVar(value='1') - ttk.Entry(rule_tab, textvariable=default_pq_var).pack(fill=tk.X, pady=4) - ttk.Label(rule_tab, text="规则预设").pack(anchor='w') - rule_preset_var = tk.StringVar() - rule_presets = { - "基础拆分与推断": [ - {"type": "split_quantity_unit", "source": "quantity"}, - {"type": "extract_spec_from_name", "source": "name"}, - {"type": "normalize_unit", "target": "unit", "map": {"箱": "件", "提": "件", "盒": "件"}}, - {"type": "compute_quantity_from_total"}, - {"type": "mark_gift"}, - {"type": "fill_missing", "fills": {"unit": "瓶"}} - ] - } - ttk.Combobox(rule_tab, textvariable=rule_preset_var, state='readonly', values=list(rule_presets.keys())).pack(fill=tk.X, pady=4) - rule_file_row = ttk.Frame(rule_tab) - rule_file_row.pack(fill=tk.X, pady=6) - ttk.Label(rule_file_row, text="选择Excel").pack(side=tk.LEFT) - rule_excel_var = tk.StringVar() - ttk.Entry(rule_file_row, textvariable=rule_excel_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - def _browse_rule_excel(): - p = filedialog.askopenfilename(title="选择Excel文件", filetypes=[("Excel","*.xlsx *.xls")]) - if p: - rule_excel_var.set(p) - ttk.Button(rule_file_row, text="浏览", command=_browse_rule_excel).pack(side=tk.LEFT) - # 加载列供来源/目标下拉选择 - rule_available_columns = [] - def _load_rule_columns(): - try: - import pandas as pd - p = rule_excel_var.get().strip() - if not p: - messagebox.showwarning("提示","请先选择Excel文件") - return - idx = None - try: - v = header_row_var.get().strip() - if v: - n = int(v) - if n > 0: - idx = n - 1 - except Exception: - pass - df = pd.read_excel(p, engine='openpyxl' if p.lower().endswith('.xlsx') else 'xlrd', header=idx if idx is not None else 0, nrows=5) - cols = [str(c) for c in list(df.columns)] - rule_available_columns.clear() - rule_available_columns.extend(cols) - messagebox.showinfo("成功", f"已加载列: {', '.join(cols)}") - _refresh_rule_params() - except Exception as e: - messagebox.showerror("加载失败", str(e)) - ttk.Button(rule_file_row, text="加载列", command=_load_rule_columns).pack(side=tk.LEFT, padx=6) - rule_preview_box = ttk.Frame(rule_tab) - rule_preview_box.pack(fill=tk.BOTH, expand=True, pady=6) - rule_left_tree = ttk.Treeview(rule_preview_box, show='headings', height=8) - rule_right_tree = ttk.Treeview(rule_preview_box, show='headings', height=8) - rule_left_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - rule_right_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - rule_scroll_x_row = ttk.Frame(rule_tab) - rule_scroll_x_row.pack(fill=tk.X) - rule_left_scroll_x = ttk.Scrollbar(rule_scroll_x_row, orient='horizontal', command=rule_left_tree.xview) - rule_right_scroll_x = ttk.Scrollbar(rule_scroll_x_row, orient='horizontal', command=rule_right_tree.xview) - rule_left_tree.configure(xscrollcommand=rule_left_scroll_x.set) - rule_right_tree.configure(xscrollcommand=rule_right_scroll_x.set) - rule_left_scroll_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3)) - rule_right_scroll_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(3,0)) - rule_step_row = ttk.Frame(rule_tab) - rule_step_row.pack(fill=tk.X, pady=6) - ttk.Label(rule_step_row, text="预览步骤").pack(side=tk.LEFT) - rule_step_var = tk.StringVar() - rule_step_cb = ttk.Combobox(rule_step_row, textvariable=rule_step_var, state='readonly') - rule_step_cb.pack(side=tk.LEFT, padx=6) - diff_mode_var = tk.BooleanVar(value=False) - ttk.Checkbutton(rule_step_row, text="仅显示变化列", variable=diff_mode_var).pack(side=tk.LEFT, padx=6) - err_label = ttk.Label(rule_step_row, text="错误计数: -") - err_label.pack(side=tk.LEFT, padx=12) - rule_step_box = ttk.Frame(rule_tab) - rule_step_box.pack(fill=tk.BOTH, expand=True, pady=6) - rule_step_tree = ttk.Treeview(rule_step_box, show='headings', height=8) - rule_step_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - rule_step_scroll_x = ttk.Scrollbar(rule_tab, orient='horizontal', command=rule_step_tree.xview) - rule_step_tree.configure(xscrollcommand=rule_step_scroll_x.set) - rule_step_scroll_x.pack(fill=tk.X) - def _build_rule_preview(): - try: - import pandas as pd - from app.core.handlers.rule_engine import apply_rules - p = rule_excel_var.get().strip() - if not p: - return - idx = None - try: - v = header_row_var.get().strip() - if v: - n = int(v) - if n > 0: - idx = n - 1 - except Exception: - pass - df = pd.read_excel(p, engine='openpyxl' if p.lower().endswith('.xlsx') else 'xlrd', header=idx if idx is not None else 0, nrows=50) - sel = sup_list.curselection() - cm = {} - rules_cur = [] - if sel: - s = data['suppliers'][sel[0]] - cm = s.get('column_mapping', {}) - rules_cur = s.get('rules', []) - rename_map = cm - df0 = df.rename(columns=rename_map) - dictionary = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - rules_use = rule_presets.get(rule_preset_var.get(), rules_cur or rule_presets["基础拆分与推断"]) - df1 = apply_rules(df0, rules_use, dictionary) - lcols = list(df0.columns) - rcols = list(df1.columns) - rule_left_tree['columns'] = lcols - rule_right_tree['columns'] = rcols - for c in lcols: - rule_left_tree.heading(c, text=c) - rule_left_tree.column(c, width=120, stretch=True) - for c in rcols: - rule_right_tree.heading(c, text=c) - rule_right_tree.column(c, width=120, stretch=True) - for iid in rule_left_tree.get_children(): - rule_left_tree.delete(iid) - for iid in rule_right_tree.get_children(): - rule_right_tree.delete(iid) - # Diff模式:仅显示变化列 - if diff_mode_var.get(): - changed_cols = [] - for c in set(lcols).intersection(set(rcols)): - try: - if not df0[c].equals(df1[c]): - changed_cols.append(c) - except Exception: - pass - lshow = [c for c in lcols if c in changed_cols] - rshow = [c for c in rcols if c in changed_cols] - rule_left_tree['columns'] = lshow - rule_right_tree['columns'] = rshow - for c in lshow: - rule_left_tree.heading(c, text=c) - rule_left_tree.column(c, width=120, stretch=True) - for c in rshow: - rule_right_tree.heading(c, text=c) - rule_right_tree.column(c, width=120, stretch=True) - for i in range(len(df0)): - rule_left_tree.insert('', tk.END, values=[str(df0.iloc[i].get(c, '')) for c in lshow]) - for i in range(len(df1)): - rule_right_tree.insert('', tk.END, values=[str(df1.iloc[i].get(c, '')) for c in rshow]) - else: - for i in range(len(df0)): - rule_left_tree.insert('', tk.END, values=[str(df0.iloc[i].get(c, '')) for c in lcols]) - for i in range(len(df1)): - rule_right_tree.insert('', tk.END, values=[str(df1.iloc[i].get(c, '')) for c in rcols]) - steps = [("原始", df0)] - cur = df0.copy() - for r in rules_use: - cur = apply_rules(cur, [r], dictionary) - steps.append((r.get('type','step'), cur.copy())) - rule_step_cb['values'] = [t for t,_ in steps] - if steps: - rule_step_var.set(steps[-1][0]) - _show_rule_step_df(steps[-1][1]) - def _on_rule_step_change(evt=None): - sel = rule_step_var.get() - for t, d in steps: - if t == sel: - _show_rule_step_df(d) - break - rule_step_cb.bind('<>', _on_rule_step_change) - rule_step_cb.event_generate('<>') - rule_step_row._steps_cache = steps - # 错误计数:统计缺失单位/规格、无法拆分数量 - try: - empty_unit = int(df1.get('unit').isna().sum()) if 'unit' in df1.columns else len(df1) - empty_qty = int(pd.to_numeric(df1.get('quantity'), errors='coerce').isna().sum()) if 'quantity' in df1.columns else len(df1) - empty_spec = int(df1.get('specification').isna().sum()) if 'specification' in df1.columns else len(df1) - err_label.configure(text=f"错误计数: 单位缺失 {empty_unit},数量缺失 {empty_qty},规格缺失 {empty_spec}") - except Exception: - err_label.configure(text="错误计数: 统计失败") - except Exception as e: - messagebox.showerror("预览失败", str(e)) - def _show_rule_step_df(dfshow): - cols = list(dfshow.columns) - rule_step_tree['columns'] = cols - for c in cols: - rule_step_tree.heading(c, text=c) - rule_step_tree.column(c, width=120, stretch=True) - for iid in rule_step_tree.get_children(): - rule_step_tree.delete(iid) - for i in range(len(dfshow)): - rule_step_tree.insert('', tk.END, values=[str(dfshow.iloc[i].get(c, '')) for c in cols]) - def _export_rule_step_csv(): - try: - if not hasattr(rule_step_row, '_steps_cache'): - return - sel = rule_step_var.get() - for t, d in rule_step_row._steps_cache: - if t == sel: - save_path = filedialog.asksaveasfilename(title="导出预览为CSV", defaultextension=".csv", filetypes=[("CSV","*.csv")]) - if save_path: - d.to_csv(save_path, index=False, encoding='utf-8-sig') - messagebox.showinfo("成功","已导出CSV") - break - except Exception as e: - messagebox.showerror("导出失败", str(e)) - ttk.Button(rule_step_row, text="生成步骤预览", command=_build_rule_preview).pack(side=tk.LEFT, padx=6) - ttk.Button(rule_step_row, text="导出预览CSV", command=_export_rule_step_csv).pack(side=tk.LEFT, padx=6) - - # 规则列表编辑器 - rules_frame = ttk.LabelFrame(rule_tab, text="规则列表") - rules_frame.pack(fill=tk.BOTH, expand=True, pady=6) - rules_list = tk.Listbox(rules_frame, height=8) - rules_list.pack(fill=tk.BOTH, expand=True) - rules_btns = ttk.Frame(rules_frame) - rules_btns.pack(fill=tk.X, pady=4) - RULE_TYPES = { - 'split_quantity_unit': {'label': '拆分数量单位', 'params': [('source','quantity')]}, - 'extract_spec_from_name': {'label': '名称提取规格', 'params': [('source','name')]}, - 'normalize_unit': {'label': '单位归一', 'params': [('target','unit'), ('map','{"箱":"件","提":"件","盒":"件"}')]}, - 'compute_quantity_from_total': {'label': '金额回推数量', 'params': []}, - 'fill_missing': {'label': '缺省填充', 'params': [('fills','{}')]}, - 'mark_gift': {'label': '标记赠品', 'params': []} - } - add_row = ttk.Frame(rules_btns) - add_row.pack(fill=tk.X) - new_rule_var = tk.StringVar() - ttk.Combobox(add_row, textvariable=new_rule_var, state='readonly', values=list(RULE_TYPES.keys())).pack(side=tk.LEFT) - def _add_rule(): - r = new_rule_var.get() - if not r: - return - rules_list.insert(tk.END, r) - new_rule_var.set('') - _refresh_rule_params() - ttk.Button(add_row, text="添加", command=_add_rule).pack(side=tk.LEFT, padx=6) - def _remove_rule(): - sel = rules_list.curselection() - if sel: - rules_list.delete(sel[0]) - _refresh_rule_params() - def _move_up(): - sel = rules_list.curselection() - if sel: - idx = sel[0] - if idx > 0: - val = rules_list.get(idx) - rules_list.delete(idx) - rules_list.insert(idx-1, val) - rules_list.select_set(idx-1) - _refresh_rule_params() - def _move_down(): - sel = rules_list.curselection() - if sel: - idx = sel[0] - if idx < rules_list.size()-1: - val = rules_list.get(idx) - rules_list.delete(idx) - rules_list.insert(idx+1, val) - rules_list.select_set(idx+1) - _refresh_rule_params() - ttk.Button(rules_btns, text="删除", command=_remove_rule).pack(side=tk.LEFT, padx=6) - ttk.Button(rules_btns, text="上移", command=_move_up).pack(side=tk.LEFT, padx=6) - ttk.Button(rules_btns, text="下移", command=_move_down).pack(side=tk.LEFT, padx=6) - rule_params_frame = ttk.LabelFrame(rule_tab, text="规则参数") - rule_params_frame.pack(fill=tk.BOTH, expand=True, pady=6) - rule_param_vars = {} - def _refresh_rule_params(): - for w in rule_params_frame.winfo_children(): - w.destroy() - sel = rules_list.curselection() - if not sel: - return - rkey = rules_list.get(sel[0]) - spec = RULE_TYPES.get(rkey, {'params': []}) - row = ttk.Frame(rule_params_frame) - row.pack(fill=tk.X, pady=4) - rule_param_vars.clear() - for name, default in spec['params']: - r = ttk.Frame(rule_params_frame) - r.pack(fill=tk.X, pady=4) - ttk.Label(r, text=name).pack(side=tk.LEFT) - var = tk.StringVar(value=default) - # 对 source/target 提供下拉列选择 - if name in ('source','target'): - cb = ttk.Combobox(r, textvariable=var, state='readonly', values=rule_available_columns) - cb.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - else: - ttk.Entry(r, textvariable=var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - rule_param_vars[name] = var - def _build_rules_from_ui(): - rules = [] - for i in range(rules_list.size()): - rkey = rules_list.get(i) - spec = RULE_TYPES.get(rkey, {'params': []}) - rd = {'type': rkey} - # if selected, use edited params; else defaults - if rules_list.curselection() and rules_list.get(rules_list.curselection()[0]) == rkey: - for name, default in spec['params']: - val = rule_param_vars.get(name).get() if name in rule_param_vars else default - try: - rd[name] = json.loads(val) if (val.strip().startswith('{') or val.strip().startswith('[')) else val - except Exception: - rd[name] = val - else: - for name, default in spec['params']: - try: - rd[name] = json.loads(default) if (isinstance(default, str) and (default.strip().startswith('{') or default.strip().startswith('['))) else default - except Exception: - rd[name] = default - rules.append(rd) - return rules - def _apply_quick_template_no_unit(): - try: - rules_list.delete(0, tk.END) - for r in ['split_quantity_unit','extract_spec_from_name','normalize_unit','fill_missing','mark_gift']: - rules_list.insert(tk.END, r) - unit_synonyms_var.set(json.dumps({"箱":"件","提":"件","盒":"件","瓶":"瓶"}, ensure_ascii=False)) - pack_multipliers_var.set(json.dumps({"件":24,"箱":24,"提":12,"盒":10}, ensure_ascii=False)) - default_unit_var.set('瓶') - default_pq_var.set('1') - messagebox.showinfo("成功","已应用无数量/单位快速模板,请在规则参数中选择来源列") - _refresh_rule_params() - except Exception as e: - messagebox.showerror("应用失败", str(e)) - ttk.Button(rules_btns, text="应用无数量/单位模板", command=_apply_quick_template_no_unit).pack(side=tk.LEFT, padx=6) - rules_list.bind('<>', lambda e: _refresh_rule_params()) - def _load_rules_to_ui(rules): - rules_list.delete(0, tk.END) - for r in rules or []: - rtype = r.get('type') - if rtype in RULE_TYPES: - rules_list.insert(tk.END, rtype) - _refresh_rule_params() - tpl_list = tk.Listbox(tpl_tab, height=8) - tpl_list.pack(fill=tk.BOTH, expand=True) - tpl_btns = ttk.Frame(tpl_tab) - tpl_btns.pack(fill=tk.X, pady=6) - current_tpl_idx_var = tk.StringVar(value='0') - ttk.Label(tpl_tab, text="当前模板索引").pack(anchor='w') - ttk.Entry(tpl_tab, textvariable=current_tpl_idx_var).pack(fill=tk.X, pady=4) - def load_selected(): - sel = sup_list.curselection() - if not sel: - return - idx = sel[0] - s = data['suppliers'][idx] - name_var.set(s.get('name','')) - desc_var.set(s.get('description','')) - header_row_var.set(str((s.get('header_row') or 0)+1)) - fp = s.get('filename_patterns',[]) - ci = s.get('content_indicators',[]) - fp_text.delete(1.0, tk.END) - fp_text.insert(tk.END, '\n'.join(fp)) - ci_text.delete(1.0, tk.END) - ci_text.insert(tk.END, '\n'.join(ci)) - d = s.get('dictionary') or {} - ignore_words_text.delete(1.0, tk.END) - ignore_words_text.insert(tk.END, '\n'.join(d.get('ignore_words', []))) - unit_synonyms_var.set(json.dumps(d.get('unit_synonyms', {}), ensure_ascii=False)) - pack_multipliers_var.set(json.dumps(d.get('pack_multipliers', {}), ensure_ascii=False)) - name_patterns_text.delete(1.0, tk.END) - name_patterns_text.insert(tk.END, '\n'.join(d.get('name_patterns', []))) - default_unit_var.set(d.get('default_unit','瓶')) - default_pq_var.set(str(d.get('default_package_quantity',1))) - tpl_list.delete(0, tk.END) - for t in s.get('output_templates', []): - tpl_list.insert(tk.END, t) - current_tpl_idx_var.set(str(s.get('current_template_index', 0))) - # 载入规则列表到UI - _load_rules_to_ui(s.get('rules', [])) - def add_template(): - p = filedialog.askopenfilename(title="选择模板文件", filetypes=[("Excel模板","*.xls *.xlsx")]) - if p: - tpl_list.insert(tk.END, os.path.relpath(p, os.getcwd()) if os.path.isabs(p) else p) - def remove_template(): - sel = tpl_list.curselection() - if sel: - tpl_list.delete(sel[0]) - ttk.Button(tpl_btns, text="添加模板", command=add_template).pack(side=tk.LEFT) - ttk.Button(tpl_btns, text="删除模板", command=remove_template).pack(side=tk.LEFT, padx=6) - def save_supplier(): - try: - sel = sup_list.curselection() - if not sel: - return - idx = sel[0] - s = data['suppliers'][idx] - s['name'] = name_var.get().strip() or s.get('name','') - s['description'] = desc_var.get().strip() - fp = [w.strip() for w in fp_text.get(1.0, tk.END).splitlines() if w.strip()] - ci = [w.strip() for w in ci_text.get(1.0, tk.END).splitlines() if w.strip()] - s['filename_patterns'] = fp - s['content_indicators'] = ci - hr = header_row_var.get().strip() - try: - s['header_row'] = max(0, int(hr)-1) if hr else 0 - except Exception: - s['header_row'] = 0 - s['dictionary'] = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - # 优先使用自定义规则列表;如未编辑则使用预设或原有 - rules_ui = _build_rules_from_ui() - if rules_ui: - s['rules'] = rules_ui - else: - s['rules'] = rule_presets.get(rule_preset_var.get(), s.get('rules', [])) - s['output_templates'] = [tpl_list.get(i) for i in range(tpl_list.size())] - try: - s['current_template_index'] = int(current_tpl_idx_var.get() or '0') - except Exception: - s['current_template_index'] = 0 - with open(cfg_path,'w',encoding='utf-8') as f: - json.dump(data,f,ensure_ascii=False,indent=2) - try: - global PROCESSOR_SERVICE - if PROCESSOR_SERVICE is None: - PROCESSOR_SERVICE = ProcessorService(ConfigManager()) - PROCESSOR_SERVICE.reload_processors() - except Exception: - pass - add_to_log(log_widget, "供应商配置已保存并重载\n", "success") - refresh_list(idx) - except Exception as e: - messagebox.showerror("保存失败", str(e)) - ctl_row = ttk.Frame(dlg) - ctl_row.pack(fill=tk.X, pady=8) - ttk.Button(ctl_row, text="保存", command=save_supplier).pack(side=tk.RIGHT) - def on_select(evt): - load_selected() - sup_list.bind('<>', on_select) - refresh_list(0) - load_selected() - except Exception as e: - messagebox.showerror("供应商管理错误", str(e)) - -def open_column_mapping_wizard_alt(log_widget, supplier_name: str = None): - try: - import json - import pandas as pd - dlg = tk.Toplevel() - dlg.title("列映射向导") - dlg.geometry("780x660") - center_window(dlg) - try: - dlg.lift() - dlg.attributes('-topmost', True) - dlg.after(200, lambda: dlg.attributes('-topmost', False)) - dlg.focus_force() - except Exception: - pass - container_scroll = ttk.Frame(dlg) - container_scroll.pack(fill=tk.BOTH, expand=True) - canvas = tk.Canvas(container_scroll) - vsb = ttk.Scrollbar(container_scroll, orient='vertical', command=canvas.yview) - canvas.configure(yscrollcommand=vsb.set) - canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - vsb.pack(side=tk.RIGHT, fill=tk.Y) - frame = ttk.Frame(canvas) - canvas.create_window((0,0), window=frame, anchor='nw') - def _on_frame_config(evt): - try: - canvas.configure(scrollregion=canvas.bbox("all")) - except Exception: - pass - frame.bind("", _on_frame_config) - def _on_mousewheel(event): - try: - delta = -1 * int(event.delta / 120) - canvas.yview_scroll(delta, "units") - except Exception: - pass - dlg.bind("", _on_mousewheel) - # inner padding - inner_pad = ttk.Frame(frame) - inner_pad.pack(fill=tk.BOTH, expand=True, padx=12, pady=12) - frame = inner_pad - ttk.Label(frame, text="选择Excel文件").pack(anchor='w') - excel_path_var = tk.StringVar() - path_row = ttk.Frame(frame) - path_row.pack(fill=tk.X, pady=6) - ttk.Entry(path_row, textvariable=excel_path_var).pack(side=tk.LEFT, fill=tk.X, expand=True) - def _browse_excel(): - p = filedialog.askopenfilename(title="选择Excel文件", filetypes=[("Excel","*.xlsx *.xls")]) - if p: - excel_path_var.set(p) - try: - dlg.lift() - dlg.attributes('-topmost', True) - dlg.after(200, lambda: dlg.attributes('-topmost', False)) - dlg.focus_force() - except Exception: - pass - ttk.Button(path_row, text="浏览", command=_browse_excel).pack(side=tk.LEFT, padx=6) - ttk.Label(frame, text="选择供应商").pack(anchor='w', pady=(8,0)) - supplier_var = tk.StringVar() - supplier_combo = ttk.Combobox(frame, textvariable=supplier_var, state='readonly') - supplier_combo.pack(fill=tk.X) - cfg_path = os.path.join("config","suppliers_config.json") - suppliers = [] - try: - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - suppliers = [s.get('name','') for s in data.get('suppliers',[])] - except Exception: - pass - supplier_combo['values'] = suppliers - if supplier_name and supplier_name in suppliers: - supplier_var.set(supplier_name) - elif suppliers: - supplier_var.set(suppliers[0]) - hdr_row = ttk.Frame(frame) - hdr_row.pack(fill=tk.X, pady=6) - ttk.Label(hdr_row, text="表头行号(从1开始)").pack(side=tk.LEFT) - header_row_var = tk.StringVar() - ttk.Entry(hdr_row, textvariable=header_row_var, width=8).pack(side=tk.LEFT, padx=6) - preview_frame = ttk.Frame(frame) - preview_frame.pack(fill=tk.BOTH, expand=True, pady=6) - preview_tree = ttk.Treeview(preview_frame, show='headings', height=8) - preview_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - preview_scroll_y = ttk.Scrollbar(preview_frame, orient='vertical', command=preview_tree.yview) - preview_tree.configure(yscrollcommand=preview_scroll_y.set) - preview_scroll_y.pack(side=tk.RIGHT, fill=tk.Y) - cols_box = ttk.Frame(frame) - cols_box.pack(fill=tk.BOTH, expand=True, pady=8) - ttk.Label(cols_box, text="源列 → 标准列映射").pack(anchor='w') - map_container = ttk.Frame(cols_box) - map_container.pack(fill=tk.BOTH, expand=True, pady=6) - map_canvas = tk.Canvas(map_container) - map_canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - map_scroll_y = ttk.Scrollbar(map_container, orient='vertical', command=map_canvas.yview) - map_scroll_y.pack(side=tk.RIGHT, fill=tk.Y) - map_canvas.configure(yscrollcommand=map_scroll_y.set) - map_frame = ttk.Frame(map_canvas) - map_canvas.create_window((0,0), window=map_frame, anchor='nw') - def _on_map_frame_config(evt): - try: - map_canvas.configure(scrollregion=map_canvas.bbox("all")) - except Exception: - pass - map_frame.bind("", _on_map_frame_config) - # 标准字段与中文标签 - std_fields = [ - "barcode","name","specification","quantity","unit", - "unit_price","total_price","category","brand","supplier" - ] - friendly_labels = { - "barcode": "条码", - "name": "商品名称", - "specification": "规格", - "quantity": "数量", - "unit": "单位", - "unit_price": "采购单价", - "total_price": "金额/小计", - "category": "类别", - "brand": "品牌", - "supplier": "供应商" - } - try: - from app.core.handlers.column_mapper import ColumnMapper - cm = ColumnMapper() - # 若项目有更完整集合,替换并生成中文标签 - std_fields = list(cm.STANDARD_COLUMNS.keys()) - for k, v in cm.STANDARD_COLUMNS.items(): - if isinstance(v, dict) and v.get('label_zh'): - friendly_labels[k] = v['label_zh'] - except Exception: - pass - source_col_options = [] - source_cols = [] - def load_columns(): - nonlocal source_cols - p = excel_path_var.get() - if not p: - return - try: - idx = None - try: - val = header_row_var.get().strip() - if val: - n = int(val) - if n > 0: - idx = n - 1 - except Exception: - pass - if p.lower().endswith('.xlsx'): - df = pd.read_excel(p, engine='openpyxl', header=idx if idx is not None else 0) - else: - df = pd.read_excel(p, engine='xlrd', header=idx if idx is not None else 0) - source_cols = [str(c) for c in list(df.columns)] - for cmb in source_col_options: - cmb['values'] = source_cols - if source_cols: - cmb.set(source_cols[0]) - try: - # 1) 先按已保存的映射预填 - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data_cfg = json.load(f) - for s in data_cfg.get('suppliers',[]): - if s.get('name') == supplier_var.get(): - cm_saved = s.get('column_mapping', {}) - for src, target in cm_saved.items(): - if target in entries and src in source_cols: - entries[target].set(src) - break - # 2) 再用关键词建议补齐未映射项 - lower = [str(c).lower() for c in source_cols] - def find_any(keys): - for i, lc in enumerate(lower): - for k in keys: - if k in lc: - return source_cols[i] - return None - mapping_suggestions = { - 'barcode': find_any(['条码','barcode','条形码']), - 'name': find_any(['商品名称','name','品名','商品']), - 'specification': find_any(['规格','型号']), - 'quantity': find_any(['数量','订购量','订货数量','采购数量']), - 'unit': find_any(['单位']), - 'unit_price': find_any(['单价','采购单价','进货价','价格']), - 'total_price': find_any(['金额','小计','合计']), - 'category': find_any(['类别','分类','品类']), - 'brand': find_any(['品牌']), - 'supplier': find_any(['供应商','供货商']) - } - for f, src in mapping_suggestions.items(): - if f in entries and (not entries[f].get()) and src: - entries[f].set(src) - except Exception: - pass - except Exception as e: - messagebox.showerror("加载失败", str(e)) - def preview_rows(): - try: - p = excel_path_var.get() - if not p: - return - if p.lower().endswith('.xlsx'): - df0 = pd.read_excel(p, engine='openpyxl', header=None, nrows=30) - else: - df0 = pd.read_excel(p, engine='xlrd', header=None, nrows=30) - cols = [str(i+1) for i in range(df0.shape[1])] - preview_tree['columns'] = cols - for c in cols: - preview_tree.heading(c, text=c) - preview_tree.column(c, width=120, stretch=True) - for iid in preview_tree.get_children(): - preview_tree.delete(iid) - for r in range(df0.shape[0]): - vals = [str(df0.iloc[r, c]) if c < df0.shape[1] else '' for c in range(df0.shape[1])] - preview_tree.insert('', tk.END, values=vals) - except Exception as e: - messagebox.showerror("加载失败", str(e)) - def on_preview_select(evt): - try: - sel = preview_tree.selection() - if sel: - idx = preview_tree.index(sel[0]) - header_row_var.set(str(idx+1)) - except Exception: - pass - preview_tree.bind('<>', on_preview_select) - ttk.Button(path_row, text="预览前30行", command=preview_rows).pack(side=tk.LEFT, padx=6) - ttk.Button(path_row, text="加载列", command=load_columns).pack(side=tk.LEFT, padx=6) - entries = {} - for i, field in enumerate(std_fields): - row = ttk.Frame(map_frame) - row.pack(fill=tk.X, pady=4) - label_text = field + (f"({friendly_labels.get(field,'')})" if friendly_labels.get(field) else "") - ttk.Label(row, text=label_text, width=22).pack(side=tk.LEFT) - var = tk.StringVar() - cmb = ttk.Combobox(row, textvariable=var, state='readonly') - cmb.pack(side=tk.LEFT, fill=tk.X, expand=True) - source_col_options.append(cmb) - entries[field] = var - act_row = ttk.Frame(frame) - act_row.pack(fill=tk.X, pady=8) - preset_row = ttk.Frame(frame) - preset_row.pack(fill=tk.X, pady=6) - ttk.Label(preset_row, text="规则预设").pack(side=tk.LEFT) - rule_preset_var = tk.StringVar() - rule_presets = { - "基础拆分与推断": [ - {"type": "split_quantity_unit", "source": "quantity"}, - {"type": "extract_spec_from_name", "source": "name"}, - {"type": "normalize_unit", "target": "unit", "map": {"箱": "件", "提": "件", "盒": "件"}}, - {"type": "compute_quantity_from_total"}, - {"type": "mark_gift"}, - {"type": "fill_missing", "fills": {"unit": "瓶"}} - ] - } - preset_cb = ttk.Combobox(preset_row, textvariable=rule_preset_var, state='readonly', values=list(rule_presets.keys())) - preset_cb.pack(side=tk.LEFT, padx=6) - dict_frame = ttk.LabelFrame(frame, text="供应商规则") - dict_frame.pack(fill=tk.BOTH, expand=True, pady=6) - ttk.Label(dict_frame, text="忽略词").pack(anchor='w') - ignore_words_text = scrolledtext.ScrolledText(dict_frame, height=3) - ignore_words_text.pack(fill=tk.X, pady=4) - ttk.Label(dict_frame, text="单位同义词(JSON)").pack(anchor='w') - unit_synonyms_var = tk.StringVar(value='{}') - ttk.Entry(dict_frame, textvariable=unit_synonyms_var).pack(fill=tk.X, pady=4) - ttk.Label(dict_frame, text="包装倍数(JSON)").pack(anchor='w') - pack_multipliers_var = tk.StringVar(value='{}') - ttk.Entry(dict_frame, textvariable=pack_multipliers_var).pack(fill=tk.X, pady=4) - ttk.Label(dict_frame, text="名称正则(每行一条)").pack(anchor='w') - name_patterns_text = scrolledtext.ScrolledText(dict_frame, height=3) - name_patterns_text.pack(fill=tk.X, pady=4) - ttk.Label(dict_frame, text="默认单位").pack(anchor='w') - default_unit_var = tk.StringVar(value='瓶') - ttk.Entry(dict_frame, textvariable=default_unit_var).pack(fill=tk.X, pady=4) - ttk.Label(dict_frame, text="默认包装数量").pack(anchor='w') - default_pq_var = tk.StringVar(value='1') - ttk.Entry(dict_frame, textvariable=default_pq_var).pack(fill=tk.X, pady=4) - def load_dictionary_for_supplier(): - try: - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - for s in data.get('suppliers',[]): - if s.get('name') == supplier_var.get(): - d = s.get('dictionary') or {} - ignore_words = '\n'.join(d.get('ignore_words', [])) - ignore_words_text.delete(1.0, tk.END) - ignore_words_text.insert(tk.END, ignore_words) - unit_synonyms_var.set(json.dumps(d.get('unit_synonyms', {}), ensure_ascii=False)) - pack_multipliers_var.set(json.dumps(d.get('pack_multipliers', {}), ensure_ascii=False)) - name_patterns_text.delete(1.0, tk.END) - name_patterns_text.insert(tk.END, '\n'.join(d.get('name_patterns', []))) - default_unit_var.set(d.get('default_unit','瓶')) - default_pq_var.set(str(d.get('default_package_quantity',1))) - break - except Exception: - pass - load_dictionary_for_supplier() - def export_dictionary(): - try: - save_path = filedialog.asksaveasfilename(title="导出规则", defaultextension=".json", filetypes=[("JSON","*.json")]) - if not save_path: - return - payload = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - with open(save_path,'w',encoding='utf-8') as f: - json.dump(payload,f,ensure_ascii=False,indent=2) - messagebox.showinfo("成功","规则已导出") - except Exception as e: - messagebox.showerror("导出失败", str(e)) - def import_dictionary(): - try: - open_path = filedialog.askopenfilename(title="导入规则", filetypes=[("JSON","*.json")]) - if not open_path: - return - with open(open_path,'r',encoding='utf-8') as f: - payload = json.load(f) - ignore_words_text.delete(1.0, tk.END) - ignore_words_text.insert(tk.END, '\n'.join(payload.get('ignore_words', []))) - unit_synonyms_var.set(json.dumps(payload.get('unit_synonyms', {}), ensure_ascii=False)) - pack_multipliers_var.set(json.dumps(payload.get('pack_multipliers', {}), ensure_ascii=False)) - name_patterns_text.delete(1.0, tk.END) - name_patterns_text.insert(tk.END, '\n'.join(payload.get('name_patterns', []))) - default_unit_var.set(payload.get('default_unit','瓶')) - default_pq_var.set(str(payload.get('default_package_quantity',1))) - messagebox.showinfo("成功","规则已导入,请保存以应用") - except Exception as e: - messagebox.showerror("导入失败", str(e)) - dict_io_row = ttk.Frame(dict_frame) - dict_io_row.pack(fill=tk.X, pady=4) - ttk.Button(dict_io_row, text="导入规则", command=import_dictionary).pack(side=tk.LEFT) - ttk.Button(dict_io_row, text="导出规则", command=export_dictionary).pack(side=tk.LEFT, padx=6) - preview_box = ttk.Frame(frame) - preview_box.pack(fill=tk.BOTH, expand=True, pady=6) - left_tree = ttk.Treeview(preview_box, show='headings', height=8) - right_tree = ttk.Treeview(preview_box, show='headings', height=8) - left_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - right_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - scroll_x_row = ttk.Frame(frame) - scroll_x_row.pack(fill=tk.X) - left_scroll_x = ttk.Scrollbar(scroll_x_row, orient='horizontal', command=left_tree.xview) - right_scroll_x = ttk.Scrollbar(scroll_x_row, orient='horizontal', command=right_tree.xview) - left_tree.configure(xscrollcommand=left_scroll_x.set) - right_tree.configure(xscrollcommand=right_scroll_x.set) - left_scroll_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0,3)) - right_scroll_x.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(3,0)) - def apply_rules_preview(): - try: - p = excel_path_var.get() - if not p: - return - idx = None - try: - val = header_row_var.get().strip() - if val: - n = int(val) - if n > 0: - idx = n - 1 - except Exception: - pass - df = pd.read_excel(p, engine='openpyxl' if p.lower().endswith('.xlsx') else 'xlrd', header=idx if idx is not None else 0, nrows=15) - rename_map = {} - for f in std_fields: - v = entries[f].get() - if v: - rename_map[v] = f - df0 = df.rename(columns=rename_map) - from app.core.handlers.rule_engine import apply_rules - rules = rule_presets.get(rule_preset_var.get(), rule_presets["基础拆分与推断"]) - dictionary = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - df1 = apply_rules(df0, rules, dictionary) - lcols = list(df0.columns) - rcols = list(df1.columns) - left_tree['columns'] = lcols - right_tree['columns'] = rcols - for c in lcols: - left_tree.heading(c, text=c) - left_tree.column(c, width=120, stretch=True) - for c in rcols: - right_tree.heading(c, text=c) - right_tree.column(c, width=120, stretch=True) - for iid in left_tree.get_children(): - left_tree.delete(iid) - for iid in right_tree.get_children(): - right_tree.delete(iid) - for i in range(len(df0)): - left_tree.insert('', tk.END, values=[str(df0.iloc[i].get(c, '')) for c in lcols]) - for i in range(len(df1)): - right_tree.insert('', tk.END, values=[str(df1.iloc[i].get(c, '')) for c in rcols]) - except Exception as e: - messagebox.showerror("预览失败", str(e)) - ttk.Button(preset_row, text="应用规则预览", command=apply_rules_preview).pack(side=tk.LEFT, padx=6) - step_row = ttk.Frame(frame) - step_row.pack(fill=tk.X, pady=6) - ttk.Label(step_row, text="预览步骤").pack(side=tk.LEFT) - step_select_var = tk.StringVar() - step_select_cb = ttk.Combobox(step_row, textvariable=step_select_var, state='readonly') - step_select_cb.pack(side=tk.LEFT, padx=6) - step_box = ttk.Frame(frame) - step_box.pack(fill=tk.BOTH, expand=True, pady=6) - step_tree = ttk.Treeview(step_box, show='headings', height=8) - step_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - step_scroll_x = ttk.Scrollbar(frame, orient='horizontal', command=step_tree.xview) - step_tree.configure(xscrollcommand=step_scroll_x.set) - step_scroll_x.pack(fill=tk.X) - def build_step_preview(): - try: - p = excel_path_var.get() - if not p: - return - idx = None - try: - val = header_row_var.get().strip() - if val: - n = int(val) - if n > 0: - idx = n - 1 - except Exception: - pass - df = pd.read_excel(p, engine='openpyxl' if p.lower().endswith('.xlsx') else 'xlrd', header=idx if idx is not None else 0, nrows=50) - rename_map = {} - for f in std_fields: - v = entries[f].get() - if v: - rename_map[v] = f - df0 = df.rename(columns=rename_map) - from app.core.handlers.rule_engine import apply_rules - rules = rule_presets.get(rule_preset_var.get(), rule_presets["基础拆分与推断"]) - dictionary = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - steps = [("原始", df0)] - cur = df0.copy() - for r in rules: - cur = apply_rules(cur, [r], dictionary) - steps.append((r.get('type','step'), cur.copy())) - step_select_cb['values'] = [t for t,_ in steps] - if steps: - step_select_var.set(steps[-1][0]) - show_step_df(steps[-1][1]) - def on_step_change(evt=None): - sel = step_select_var.get() - for t, d in steps: - if t == sel: - show_step_df(d) - break - step_select_cb.bind('<>', on_step_change) - step_select_cb.event_generate('<>') - step_row._steps_cache = steps - except Exception as e: - messagebox.showerror("预览失败", str(e)) - def show_step_df(dfshow): - cols = list(dfshow.columns) - step_tree['columns'] = cols - for c in cols: - step_tree.heading(c, text=c) - step_tree.column(c, width=120, stretch=True) - for iid in step_tree.get_children(): - step_tree.delete(iid) - for i in range(len(dfshow)): - step_tree.insert('', tk.END, values=[str(dfshow.iloc[i].get(c, '')) for c in cols]) - def export_step_csv(): - try: - if not hasattr(step_row, '_steps_cache'): - return - sel = step_select_var.get() - for t, d in step_row._steps_cache: - if t == sel: - save_path = filedialog.asksaveasfilename(title="导出预览为CSV", defaultextension=".csv", filetypes=[("CSV","*.csv")]) - if save_path: - d.to_csv(save_path, index=False, encoding='utf-8-sig') - messagebox.showinfo("成功","已导出CSV") - break - except Exception as e: - messagebox.showerror("导出失败", str(e)) - ttk.Button(step_row, text="生成步骤预览", command=build_step_preview).pack(side=tk.LEFT, padx=6) - ttk.Button(step_row, text="导出预览CSV", command=export_step_csv).pack(side=tk.LEFT, padx=6) - def save_mapping(): - try: - if not supplier_var.get(): - messagebox.showwarning("提示","请选择供应商") - return - mapping = {} - for f in std_fields: - v = entries[f].get() - if v: - mapping[v] = f - if not mapping: - messagebox.showwarning("提示","请先加载列并选择映射") - return - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - else: - data = {"suppliers":[]} - for s in data.get('suppliers',[]): - if s.get('name') == supplier_var.get(): - s['column_mapping'] = mapping - hr = header_row_var.get().strip() - try: - if hr: - n = int(hr) - if n > 0: - s['header_row'] = n - 1 - except Exception: - pass - if rule_preset_var.get(): - s['rules'] = rule_presets.get(rule_preset_var.get(), []) - s['dictionary'] = { - 'ignore_words': [w.strip() for w in ignore_words_text.get(1.0, tk.END).splitlines() if w.strip()], - 'unit_synonyms': json.loads(unit_synonyms_var.get() or '{}'), - 'pack_multipliers': json.loads(pack_multipliers_var.get() or '{}'), - 'name_patterns': [w.strip() for w in name_patterns_text.get(1.0, tk.END).splitlines() if w.strip()], - 'default_unit': default_unit_var.get().strip() or '瓶', - 'default_package_quantity': int(default_pq_var.get() or '1') - } - with open(cfg_path,'w',encoding='utf-8') as f: - json.dump(data,f,ensure_ascii=False,indent=2) - add_to_log(log_widget, "列映射已保存\n", "success") - messagebox.showinfo("成功","列映射已保存") - dlg.destroy() - except Exception as e: - messagebox.showerror("保存失败", str(e)) - def export_mapping(): - try: - save_path = filedialog.asksaveasfilename(title="导出映射", defaultextension=".json", filetypes=[("JSON","*.json")]) - if not save_path: - return - mapping = {} - for f in std_fields: - v = entries[f].get() - if v: - mapping[v] = f - payload = {'supplier': supplier_var.get(), 'header_row': None, 'column_mapping': mapping} - hr = header_row_var.get().strip() - try: - if hr: - n = int(hr) - if n > 0: - payload['header_row'] = n - 1 - except Exception: - pass - with open(save_path,'w',encoding='utf-8') as f: - json.dump(payload,f,ensure_ascii=False,indent=2) - messagebox.showinfo("成功","映射已导出") - except Exception as e: - messagebox.showerror("导出失败", str(e)) - def import_mapping(): - try: - open_path = filedialog.askopenfilename(title="导入映射", filetypes=[("JSON","*.json")]) - if not open_path: - return - with open(open_path,'r',encoding='utf-8') as f: - payload = json.load(f) - cm = payload.get('column_mapping',{}) - for src, target in cm.items(): - if target in entries: - entries[target].set(src) - hr = payload.get('header_row') - if isinstance(hr,int) and hr >= 0: - header_row_var.set(str(hr+1)) - messagebox.showinfo("成功","映射已导入,请保存以应用") - except Exception as e: - messagebox.showerror("导入失败", str(e)) - ttk.Button(act_row, text="导入映射", command=import_mapping).pack(side=tk.LEFT) - ttk.Button(act_row, text="导出映射", command=export_mapping).pack(side=tk.LEFT, padx=8) - ttk.Button(act_row, text="保存映射", command=save_mapping).pack(side=tk.LEFT, padx=8) - ttk.Button(act_row, text="保存并返回", command=save_mapping).pack(side=tk.LEFT, padx=8) - ttk.Button(act_row, text="取消", command=dlg.destroy).pack(side=tk.RIGHT) - except Exception as e: - messagebox.showerror("向导错误", str(e)) - def open_template_manager(log_widget): try: import json @@ -4425,100 +2974,6 @@ def edit_barcode_mappings(log_widget): add_to_log(log_widget, f"编辑条码映射时出错: {str(e)}\n", "error") messagebox.showerror("错误", f"编辑条码映射时出错: {str(e)}") -def open_column_mapping_wizard(log_widget): - try: - import json - from tkinter import ttk - from app.services.order_service import OrderService - from app.core.handlers.column_mapper import ColumnMapper - suppliers = [] - config_path = os.path.abspath(os.path.join('config','suppliers_config.json')) - if os.path.exists(config_path): - with open(config_path,'r',encoding='utf-8') as f: - data = json.load(f) - suppliers = data.get('suppliers', []) - file_path = select_excel_file(log_widget) - if not file_path: - try: - svc = OrderService() - file_path = svc.get_latest_excel() - except Exception: - file_path = None - if not file_path: - messagebox.showwarning("未选择文件","请选择一个用于分析的Excel文件") - return - import pandas as pd - df = pd.read_excel(file_path, nrows=5) - detected_cols = list(df.columns) - dlg = tk.Toplevel() - dlg.title("列映射向导") - dlg.geometry("520x560") - center_window(dlg) - frame = tk.Frame(dlg, bg=THEMES[THEME_MODE]["card_bg"]) - frame.pack(fill=tk.BOTH, expand=True, padx=12, pady=12) - tk.Label(frame, text="供应商", bg=THEMES[THEME_MODE]["card_bg"], fg=THEMES[THEME_MODE]["fg"]).pack(anchor='w') - supplier_names = [s.get('name','') for s in suppliers] - supplier_var = tk.StringVar(value=supplier_names[0] if supplier_names else '') - supplier_cb = ttk.Combobox(frame, values=supplier_names, textvariable=supplier_var, state='readonly') - supplier_cb.pack(fill=tk.X, pady=6) - std_cols = list(ColumnMapper.STANDARD_COLUMNS.keys()) - mapper = ColumnMapper() - suggestions = mapper.suggest_column_mapping(df) - mapping_vars = {} - for sc in std_cols: - row = tk.Frame(frame, bg=THEMES[THEME_MODE]["card_bg"]) - row.pack(fill=tk.X, pady=4) - tk.Label(row, text=sc, width=14, anchor='w', bg=THEMES[THEME_MODE]["card_bg"], fg=THEMES[THEME_MODE]["fg"]).pack(side=tk.LEFT) - var = tk.StringVar() - values = ['未映射'] + detected_cols - cb = ttk.Combobox(row, values=values, textvariable=var, state='readonly') - pre = None - for col, stds in suggestions.items(): - if sc in stds and col in detected_cols: - pre = col - break - var.set(pre if pre else '未映射') - cb.pack(side=tk.LEFT, fill=tk.X, expand=True) - mapping_vars[sc] = var - btn_row = tk.Frame(frame, bg=THEMES[THEME_MODE]["card_bg"]) - btn_row.pack(fill=tk.X, pady=12) - def save_mapping(): - try: - supplier_name = supplier_var.get() - if not supplier_name: - messagebox.showwarning("未选择供应商","请选择一个供应商") - return - updated = False - for s in suppliers: - if s.get('name') == supplier_name: - cm = {} - for sc, var in mapping_vars.items(): - src = var.get() - if src and src != '未映射': - cm[src] = sc - s['column_mapping'] = cm - updated = True - break - if not updated: - messagebox.showerror("错误","未找到供应商配置") - return - with open(config_path,'w',encoding='utf-8') as f: - json.dump({'suppliers': suppliers}, f, ensure_ascii=False, indent=2) - try: - global PROCESSOR_SERVICE - if PROCESSOR_SERVICE is None: - PROCESSOR_SERVICE = ProcessorService(ConfigManager()) - PROCESSOR_SERVICE.reload_processors() - except Exception: - pass - add_to_log(log_widget, "列映射已保存并重新加载供应商配置\n", "success") - dlg.destroy() - except Exception as e: - messagebox.showerror("保存失败", str(e)) - create_modern_button(btn_row, "保存", save_mapping, "primary", px_width=92, px_height=32).pack(side=tk.RIGHT) - except Exception as e: - messagebox.showerror("向导错误", str(e)) - def bind_keyboard_shortcuts(root, log_widget, status_bar): """绑定键盘快捷键""" # Ctrl+O - 处理单个图片 @@ -4620,237 +3075,3 @@ def _extract_path_from_recent_item(s: str) -> str: return p.strip().strip('"') except Exception: return s.strip().strip('"') -def open_validation_panel(log_widget): - try: - import json - import pandas as pd - import xlrd - dlg = tk.Toplevel() - dlg.title("验证匹配") - dlg.geometry("900x680") - center_window(dlg) - try: - dlg.lift() - dlg.attributes('-topmost', True) - dlg.after(200, lambda: dlg.attributes('-topmost', False)) - dlg.focus_force() - except Exception: - pass - outer = ttk.Frame(dlg) - outer.pack(fill=tk.BOTH, expand=True) - canvas = tk.Canvas(outer) - vsb = ttk.Scrollbar(outer, orient='vertical', command=canvas.yview) - canvas.configure(yscrollcommand=vsb.set) - canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - vsb.pack(side=tk.RIGHT, fill=tk.Y) - frame = ttk.Frame(canvas) - canvas.create_window((0,0), window=frame, anchor='nw') - frame.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) - cfg_path = os.path.join("config","suppliers_config.json") - suppliers = [] - data_cfg = {"suppliers": []} - try: - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data_cfg = json.load(f) - suppliers = [s.get('name','') for s in data_cfg.get('suppliers',[])] - except Exception: - pass - row1 = ttk.Frame(frame) - row1.pack(fill=tk.X, pady=6) - ttk.Label(row1, text="供应商").pack(side=tk.LEFT) - sup_var = tk.StringVar() - ttk.Combobox(row1, textvariable=sup_var, state='readonly', values=suppliers).pack(side=tk.LEFT, padx=6) - row2 = ttk.Frame(frame) - row2.pack(fill=tk.X, pady=6) - ttk.Label(row2, text="原始文件").pack(side=tk.LEFT) - orig_var = tk.StringVar() - ttk.Entry(row2, textvariable=orig_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - ttk.Button(row2, text="浏览", command=lambda: orig_var.set(filedialog.askopenfilename(title="选择原始Excel", filetypes=[("Excel","*.xlsx *.xls")]) or orig_var.get())).pack(side=tk.LEFT) - row3 = ttk.Frame(frame) - row3.pack(fill=tk.X, pady=6) - ttk.Label(row3, text="期望结果").pack(side=tk.LEFT) - expect_var = tk.StringVar() - ttk.Entry(row3, textvariable=expect_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) - ttk.Button(row3, text="浏览", command=lambda: expect_var.set(filedialog.askopenfilename(title="选择期望结果", filetypes=[("Excel","*.xls *.xlsx")]) or expect_var.get())).pack(side=tk.LEFT) - act_row = ttk.Frame(frame) - act_row.pack(fill=tk.X, pady=8) - result_text = scrolledtext.ScrolledText(frame, height=6) - result_text.pack(fill=tk.BOTH, expand=True) - diff_box = ttk.Frame(frame) - diff_box.pack(fill=tk.BOTH, expand=True, pady=6) - diff_tree = ttk.Treeview(diff_box, show='headings', height=12) - diff_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - diff_scroll_x = ttk.Scrollbar(frame, orient='horizontal', command=diff_tree.xview) - diff_tree.configure(xscrollcommand=diff_scroll_x.set) - diff_scroll_x.pack(fill=tk.X) - suggestions_cache = [] - def _run_validation(): - try: - import numpy as np - from app.services.order_service import OrderService - supplier = sup_var.get() - orig = orig_var.get().strip() - expect = expect_var.get().strip() - if not supplier or not orig or not expect: - messagebox.showwarning("提示","请选择供应商、原始文件与期望结果") - return - service = OrderService() - out_path = service.process_excel(orig, progress_cb=lambda p: None) - result_text.configure(state=tk.NORMAL) - result_text.delete(1.0, tk.END) - result_text.insert(tk.END, f"生成结果: {out_path}\n") - def _read_df(path): - ap = os.path.abspath(path) - if ap.lower().endswith('.xlsx'): - return pd.read_excel(ap, engine='openpyxl') - else: - return pd.read_excel(ap, engine='xlrd') - df_actual = _read_df(out_path) - df_expect = _read_df(expect) - keys = None - if 'barcode' in df_actual.columns and 'barcode' in df_expect.columns: - keys = 'barcode' - elif '条码' in df_actual.columns and '条码' in df_expect.columns: - keys = '条码' - elif 'name' in df_actual.columns and 'name' in df_expect.columns: - keys = 'name' - else: - keys = None - fields = ['barcode','name','specification','quantity','unit','unit_price','total_price'] - for c in ['条码','商品名称','规格','采购量','单位','采购单价','金额','小计']: - if c in df_expect.columns and c not in fields: - fields.append(c) - records = [] - mismatches = 0 - if keys: - a = df_actual.set_index(keys) - e = df_expect.set_index(keys) - idx_union = list(set(a.index.tolist()) | set(e.index.tolist())) - for k in idx_union: - ra = a.loc[k] if k in a.index else None - rexp = e.loc[k] if k in e.index else None - for f in ['barcode','name','specification','quantity','unit','unit_price','total_price']: - va = None - ve = None - if ra is not None and f in a.columns: - va = ra.get(f) - if rexp is not None and f in e.columns: - ve = rexp.get(f) - if va is None and ve is None: - continue - def _norm(v): - try: - return float(v) - except Exception: - return str(v) if v is not None else '' - if _norm(va) != _norm(ve): - records.append([str(k), f, str(ve), str(va)]) - mismatches += 1 - else: - for i in range(min(len(df_actual), len(df_expect))): - for f in ['barcode','name','specification','quantity','unit','unit_price','total_price']: - va = df_actual.iloc[i].get(f) if f in df_actual.columns else None - ve = df_expect.iloc[i].get(f) if f in df_expect.columns else None - if va is None and ve is None: - continue - def _norm(v): - try: - return float(v) - except Exception: - return str(v) if v is not None else '' - if _norm(va) != _norm(ve): - records.append([str(i), f, str(ve), str(va)]) - mismatches += 1 - diff_tree['columns'] = ['key','field','expected','actual'] - for c in ['key','field','expected','actual']: - diff_tree.heading(c, text=c) - diff_tree.column(c, width=160, stretch=True) - for iid in diff_tree.get_children(): - diff_tree.delete(iid) - for row in records: - diff_tree.insert('', tk.END, values=row) - result_text.insert(tk.END, f"差异条目: {mismatches}\n") - result_text.configure(state=tk.DISABLED) - suggestions = [] - try: - unit_vals = df_actual.get('unit') if 'unit' in df_actual.columns else pd.Series([]) - bad_units = unit_vals.astype(str).isin(['箱','件','提','盒']).sum() - if bad_units > 0: - suggestions.append({"type":"normalize_unit","target":"unit","map":{"箱":"件","提":"件","盒":"件"}}) - except Exception: - pass - try: - qty = pd.to_numeric(df_actual.get('quantity'), errors='coerce') if 'quantity' in df_actual.columns else pd.Series([]) - missing_qty = int(qty.isna().sum()) if not qty.empty else 0 - if missing_qty > 0: - cols = df_actual.columns.tolist() - src = None - for cand in ['订单数量','订购量','订货数量']: - if cand in cols: - src = cand - break - if src: - suggestions.append({"type":"split_quantity_unit","source":src}) - except Exception: - pass - try: - spec = df_actual.get('specification') if 'specification' in df_actual.columns else pd.Series([]) - missing_spec = int(spec.isna().sum()) if not spec.empty else 0 - if missing_spec > 0: - suggestions.append({"type":"extract_spec_from_name","source":"name"}) - except Exception: - pass - try: - up = pd.to_numeric(df_actual.get('unit_price'), errors='coerce').fillna(0) if 'unit_price' in df_actual.columns else pd.Series([]) - tp = pd.to_numeric(df_actual.get('total_price'), errors='coerce').fillna(0) if 'total_price' in df_actual.columns else pd.Series([]) - qty = pd.to_numeric(df_actual.get('quantity'), errors='coerce').fillna(np.nan) if 'quantity' in df_actual.columns else pd.Series([]) - if not qty.empty and not up.empty and not tp.empty: - need_compute = ((qty.isna()) & (up > 0) & (tp > 0)).sum() - if need_compute > 0: - suggestions.append({"type":"compute_quantity_from_total"}) - except Exception: - pass - suggestions_cache.clear() - suggestions_cache.extend(suggestions) - if suggestions: - result_text.configure(state=tk.NORMAL) - result_text.insert(tk.END, f"建议: {json.dumps(suggestions, ensure_ascii=False)}\n") - result_text.configure(state=tk.DISABLED) - except Exception as e: - messagebox.showerror("验证失败", str(e)) - def _apply_suggestions(): - try: - supplier = sup_var.get() - if not supplier: - return - if os.path.exists(cfg_path): - with open(cfg_path,'r',encoding='utf-8') as f: - data = json.load(f) - for s in data.get('suppliers',[]): - if s.get('name') == supplier: - rules = s.get('rules', []) - for sug in suggestions_cache: - exists = any(r.get('type') == sug.get('type') for r in rules) - if not exists: - rules.append(sug) - s['rules'] = rules - d = s.get('dictionary') or {} - d.setdefault('unit_synonyms', {"箱":"件","提":"件","盒":"件","瓶":"瓶"}) - d.setdefault('pack_multipliers', {"件":24,"箱":24,"提":12,"盒":10}) - s['dictionary'] = d - break - with open(cfg_path,'w',encoding='utf-8') as f: - json.dump(data,f,ensure_ascii=False,indent=2) - result_text.configure(state=tk.NORMAL) - result_text.insert(tk.END, "已应用建议并保存配置\n") - result_text.configure(state=tk.DISABLED) - else: - messagebox.showwarning("提示","配置文件不存在") - except Exception as e: - messagebox.showerror("应用失败", str(e)) - ttk.Button(act_row, text="运行验证", command=_run_validation).pack(side=tk.LEFT) - ttk.Button(act_row, text="应用建议", command=_apply_suggestions).pack(side=tk.LEFT, padx=6) - ttk.Button(act_row, text="关闭", command=dlg.destroy).pack(side=tk.RIGHT) - except Exception as e: - messagebox.showerror("验证匹配错误", str(e))