增强版v2-初始化仓库,验证好了ocr部分,先备份一次

This commit is contained in:
2025-05-02 17:25:47 +08:00
commit 0035cd1893
88 changed files with 9031 additions and 0 deletions
+5
View File
@@ -0,0 +1,5 @@
"""
OCR订单处理系统 - 工具模块
------------------------
提供系统通用工具和辅助函数。
"""
Binary file not shown.
Binary file not shown.
Binary file not shown.
+251
View File
@@ -0,0 +1,251 @@
"""
文件操作工具模块
--------------
提供文件处理、查找和管理功能。
"""
import os
import sys
import shutil
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
from .log_utils import get_logger
logger = get_logger(__name__)
def ensure_dir(directory: str) -> bool:
"""
确保目录存在,如果不存在则创建
Args:
directory: 目录路径
Returns:
是否成功创建或目录已存在
"""
try:
os.makedirs(directory, exist_ok=True)
return True
except Exception as e:
logger.error(f"创建目录失败: {directory}, 错误: {e}")
return False
def get_file_extension(file_path: str) -> str:
"""
获取文件扩展名(小写)
Args:
file_path: 文件路径
Returns:
文件扩展名,包含点(例如 .jpg)
"""
return os.path.splitext(file_path)[1].lower()
def is_valid_extension(file_path: str, allowed_extensions: List[str]) -> bool:
"""
检查文件扩展名是否在允许的列表中
Args:
file_path: 文件路径
allowed_extensions: 允许的扩展名列表(例如 ['.jpg', '.png']
Returns:
文件扩展名是否有效
"""
ext = get_file_extension(file_path)
return ext in allowed_extensions
def get_files_by_extensions(directory: str, extensions: List[str], exclude_patterns: List[str] = None) -> List[str]:
"""
获取指定目录下所有符合扩展名的文件路径
Args:
directory: 目录路径
extensions: 扩展名列表(例如 ['.jpg', '.png']
exclude_patterns: 排除的文件名模式(例如 ['~$', '.tmp']
Returns:
文件路径列表
"""
if exclude_patterns is None:
exclude_patterns = ['~$', '.tmp']
files = []
for file in os.listdir(directory):
file_path = os.path.join(directory, file)
# 检查是否是文件
if not os.path.isfile(file_path):
continue
# 检查扩展名
if not is_valid_extension(file_path, extensions):
continue
# 检查排除模式
exclude = False
for pattern in exclude_patterns:
if pattern in file:
exclude = True
break
if not exclude:
files.append(file_path)
return files
def get_latest_file(directory: str, pattern: str = "", extensions: List[str] = None) -> Optional[str]:
"""
获取指定目录下最新的文件
Args:
directory: 目录路径
pattern: 文件名包含的字符串模式
extensions: 限制的文件扩展名列表
Returns:
最新文件的路径,如果没有找到则返回None
"""
if not os.path.exists(directory):
logger.warning(f"目录不存在: {directory}")
return None
files = []
for file in os.listdir(directory):
# 检查模式和扩展名
if (pattern and pattern not in file) or \
(extensions and not is_valid_extension(file, extensions)):
continue
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
files.append((file_path, os.path.getmtime(file_path)))
if not files:
logger.warning(f"未在目录 {directory} 中找到符合条件的文件")
return None
# 按修改时间排序,返回最新的
sorted_files = sorted(files, key=lambda x: x[1], reverse=True)
return sorted_files[0][0]
def generate_timestamp_filename(original_path: str) -> str:
"""
生成基于时间戳的文件名
Args:
original_path: 原始文件路径
Returns:
带时间戳的新文件路径
"""
dir_path = os.path.dirname(original_path)
ext = os.path.splitext(original_path)[1]
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
return os.path.join(dir_path, f"{timestamp}{ext}")
def rename_file(source_path: str, target_path: str) -> bool:
"""
重命名文件
Args:
source_path: 源文件路径
target_path: 目标文件路径
Returns:
是否成功重命名
"""
try:
# 确保目标目录存在
target_dir = os.path.dirname(target_path)
ensure_dir(target_dir)
# 重命名文件
os.rename(source_path, target_path)
logger.info(f"文件已重命名: {os.path.basename(source_path)} -> {os.path.basename(target_path)}")
return True
except Exception as e:
logger.error(f"重命名文件失败: {e}")
return False
def load_json(file_path: str, default: Any = None) -> Any:
"""
加载JSON文件
Args:
file_path: JSON文件路径
default: 如果文件不存在或加载失败时返回的默认值
Returns:
JSON内容,或者默认值
"""
if not os.path.exists(file_path):
return default
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载JSON文件失败: {file_path}, 错误: {e}")
return default
def save_json(data: Any, file_path: str, ensure_ascii: bool = False, indent: int = 2) -> bool:
"""
保存数据到JSON文件
Args:
data: 要保存的数据
file_path: JSON文件路径
ensure_ascii: 是否确保ASCII编码
indent: 缩进空格数
Returns:
是否成功保存
"""
try:
# 确保目录存在
directory = os.path.dirname(file_path)
ensure_dir(directory)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=ensure_ascii, indent=indent)
logger.debug(f"JSON数据已保存到: {file_path}")
return True
except Exception as e:
logger.error(f"保存JSON文件失败: {file_path}, 错误: {e}")
return False
def get_file_size(file_path: str) -> int:
"""
获取文件大小(字节)
Args:
file_path: 文件路径
Returns:
文件大小(字节)
"""
try:
return os.path.getsize(file_path)
except Exception as e:
logger.error(f"获取文件大小失败: {file_path}, 错误: {e}")
return 0
def is_file_size_valid(file_path: str, max_size_mb: float) -> bool:
"""
检查文件大小是否在允许范围内
Args:
file_path: 文件路径
max_size_mb: 最大允许大小(MB
Returns:
文件大小是否有效
"""
size_bytes = get_file_size(file_path)
max_size_bytes = max_size_mb * 1024 * 1024
return size_bytes <= max_size_bytes
+129
View File
@@ -0,0 +1,129 @@
"""
日志工具模块
----------
提供统一的日志配置和管理功能。
"""
import os
import sys
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict
# 日志处理器字典,用于跟踪已创建的处理器
_handlers: Dict[str, logging.Handler] = {}
def setup_logger(name: str,
log_file: Optional[str] = None,
level=logging.INFO,
console_output: bool = True,
file_output: bool = True,
log_format: str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') -> logging.Logger:
"""
配置并返回日志记录器
Args:
name: 日志记录器的名称
log_file: 日志文件路径,如果为None则使用默认路径
level: 日志级别
console_output: 是否输出到控制台
file_output: 是否输出到文件
log_format: 日志格式
Returns:
配置好的日志记录器
"""
# 获取或创建日志记录器
logger = logging.getLogger(name)
# 如果已经配置过处理器,不重复配置
if logger.handlers:
return logger
# 设置日志级别
logger.setLevel(level)
# 创建格式化器
formatter = logging.Formatter(log_format)
# 如果需要输出到文件
if file_output:
# 如果没有指定日志文件,使用默认路径
if log_file is None:
log_dir = os.path.abspath('logs')
# 确保日志目录存在
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"{name}.log")
# 创建文件处理器
try:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(level)
logger.addHandler(file_handler)
_handlers[f"{name}_file"] = file_handler
# 记录活跃标记,避免被日志清理工具删除
active_marker = os.path.join(os.path.dirname(log_file), f"{name}.active")
with open(active_marker, 'w', encoding='utf-8') as f:
f.write(f"Active since: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e:
print(f"无法创建日志文件处理器: {e}")
# 如果需要输出到控制台
if console_output:
# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(level)
logger.addHandler(console_handler)
_handlers[f"{name}_console"] = console_handler
return logger
def get_logger(name: str) -> logging.Logger:
"""
获取已配置的日志记录器,如果不存在则创建一个新的
Args:
name: 日志记录器的名称
Returns:
日志记录器
"""
logger = logging.getLogger(name)
if not logger.handlers:
return setup_logger(name)
return logger
def close_logger(name: str) -> None:
"""
关闭日志记录器的所有处理器
Args:
name: 日志记录器的名称
"""
logger = logging.getLogger(name)
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
# 清除处理器缓存
_handlers.pop(f"{name}_file", None)
_handlers.pop(f"{name}_console", None)
def cleanup_active_marker(name: str) -> None:
"""
清理日志活跃标记
Args:
name: 日志记录器的名称
"""
try:
log_dir = os.path.abspath('logs')
active_marker = os.path.join(log_dir, f"{name}.active")
if os.path.exists(active_marker):
os.remove(active_marker)
except Exception as e:
print(f"无法清理日志活跃标记: {e}")
+207
View File
@@ -0,0 +1,207 @@
"""
字符串处理工具模块
---------------
提供字符串处理、正则表达式匹配等功能。
"""
import re
from typing import Dict, List, Optional, Tuple, Any, Match, Pattern
def clean_string(text: str) -> str:
"""
清理字符串,移除多余空白
Args:
text: 源字符串
Returns:
清理后的字符串
"""
if not isinstance(text, str):
return ""
# 移除首尾空白
text = text.strip()
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
return text
def remove_non_digits(text: str) -> str:
"""
移除字符串中的非数字字符
Args:
text: 源字符串
Returns:
只包含数字的字符串
"""
if not isinstance(text, str):
return ""
return re.sub(r'\D', '', text)
def extract_number(text: str) -> Optional[float]:
"""
从字符串中提取数字
Args:
text: 源字符串
Returns:
提取的数字,如果没有则返回None
"""
if not isinstance(text, str):
return None
# 匹配数字(可以包含小数点和负号)
match = re.search(r'-?\d+(\.\d+)?', text)
if match:
return float(match.group())
return None
def extract_unit(text: str, units: List[str] = None) -> Optional[str]:
"""
从字符串中提取单位
Args:
text: 源字符串
units: 有效单位列表,如果为None则自动识别
Returns:
提取的单位,如果没有则返回None
"""
if not isinstance(text, str):
return None
# 如果提供了单位列表,检查字符串中是否包含
if units:
for unit in units:
if unit in text:
return unit
return None
# 否则,尝试自动识别常见单位
# 正则表达式:匹配数字后面的非数字部分作为单位
match = re.search(r'\d+\s*([^\d\s]+)', text)
if match:
return match.group(1)
return None
def extract_number_and_unit(text: str) -> Tuple[Optional[float], Optional[str]]:
"""
从字符串中同时提取数字和单位
Args:
text: 源字符串
Returns:
(数字, 单位)元组,如果没有则对应返回None
"""
if not isinstance(text, str):
return None, None
# 匹配数字和单位的组合
match = re.search(r'(-?\d+(?:\.\d+)?)\s*([^\d\s]+)?', text)
if match:
number = float(match.group(1))
unit = match.group(2) if match.group(2) else None
return number, unit
return None, None
def parse_specification(spec_str: str) -> Optional[int]:
"""
解析规格字符串,提取包装数量
支持格式:1*15, 1x15, 1*5*10
Args:
spec_str: 规格字符串
Returns:
包装数量,如果无法解析则返回None
"""
if not spec_str or not isinstance(spec_str, str):
return None
try:
# 清理规格字符串
spec_str = clean_string(spec_str)
# 匹配1*5*10 格式的三级规格
match = re.search(r'(\d+)[\*xX×](\d+)[\*xX×](\d+)', spec_str)
if match:
# 取最后一个数字作为袋数量
return int(match.group(3))
# 匹配1*15, 1x15 格式
match = re.search(r'(\d+)[\*xX×](\d+)', spec_str)
if match:
# 取第二个数字作为包装数量
return int(match.group(2))
# 匹配24瓶/件等格式
match = re.search(r'(\d+)[瓶个支袋][/](件|箱)', spec_str)
if match:
return int(match.group(1))
# 匹配4L格式
match = re.search(r'(\d+(?:\.\d+)?)\s*[Ll升][*×]?(\d+)?', spec_str)
if match:
# 如果有第二个数字,返回它;否则返回1
return int(match.group(2)) if match.group(2) else 1
except Exception:
pass
return None
def clean_barcode(barcode: Any) -> str:
"""
清理条码格式
Args:
barcode: 条码(可以是字符串、整数或浮点数)
Returns:
清理后的条码字符串
"""
if isinstance(barcode, (int, float)):
barcode = f"{barcode:.0f}"
# 清理条码格式,移除可能的非数字字符(包括小数点)
barcode_clean = re.sub(r'\.0+$', '', str(barcode)) # 移除末尾0
barcode_clean = re.sub(r'\D', '', barcode_clean) # 只保留数字
return barcode_clean
def is_scientific_notation(value: str) -> bool:
"""
检查字符串是否是科学计数法表示
Args:
value: 字符串值
Returns:
是否是科学计数法
"""
return bool(re.match(r'^-?\d+(\.\d+)?[eE][+-]?\d+$', str(value)))
def format_barcode(barcode: Any) -> str:
"""
格式化条码,处理科学计数法
Args:
barcode: 条码值
Returns:
格式化后的条码字符串
"""
if isinstance(barcode, (int, float)) or is_scientific_notation(str(barcode)):
try:
# 转换为整数并格式化为字符串
return f"{int(float(barcode))}"
except (ValueError, TypeError):
pass
# 如果不是数字或转换失败,返回原始字符串
return str(barcode)