feat: 益选 OCR 订单处理系统初始提交
- 智能供应商识别(蓉城易购/烟草/杨碧月/通用) - 百度 OCR 表格识别集成 - 规则引擎(列映射/数据清洗/单位转换/规格推断) - 条码映射管理与云端同步(Gitea REST API) - 云端同步支持:条码映射、供应商配置、商品资料、采购模板 - 拖拽一键处理(图片→OCR→Excel→合并) - 191 个单元测试 - 移除无用的模板管理功能 - 清理 IDE 产物目录 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
"""
|
||||
OCR订单处理系统 - 工具模块
|
||||
------------------------
|
||||
提供系统通用工具和辅助函数。
|
||||
"""
|
||||
@@ -0,0 +1,184 @@
|
||||
"""云端同步模块 — 基于 Gitea REST API 的文件同步"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
from .log_utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class GiteaSync:
|
||||
"""通过 Gitea REST API 读写仓库文件"""
|
||||
|
||||
def __init__(self, base_url: str, owner: str, repo: str, token: str, timeout: int = 15):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.token = token
|
||||
self.timeout = timeout
|
||||
|
||||
@property
|
||||
def _headers(self) -> dict:
|
||||
return {"Authorization": f"token {self.token}"}
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.base_url}/api/v1/repos/{self.owner}/{self.repo}/contents/{path}"
|
||||
|
||||
def pull_file(self, remote_path: str) -> Optional[Tuple[bytes, str]]:
|
||||
"""从仓库下载文件
|
||||
|
||||
Returns:
|
||||
(content_bytes, sha) 或 None(文件不存在或失败)
|
||||
"""
|
||||
try:
|
||||
resp = requests.get(
|
||||
self._api_url(remote_path),
|
||||
headers=self._headers,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
if resp.status_code == 404:
|
||||
logger.info(f"云端文件不存在: {remote_path}")
|
||||
return None
|
||||
if resp.status_code != 200:
|
||||
logger.warning(f"拉取文件失败: {resp.status_code} {resp.text[:200]}")
|
||||
return None
|
||||
|
||||
data = resp.json()
|
||||
sha = data.get("sha", "")
|
||||
content_b64 = data.get("content", "")
|
||||
# Gitea 返回的 base64 可能含换行
|
||||
content_bytes = base64.b64decode(content_b64.replace("\n", ""))
|
||||
logger.info(f"拉取文件成功: {remote_path} ({len(content_bytes)} bytes)")
|
||||
return content_bytes, sha
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"拉取文件网络错误: {e}")
|
||||
return None
|
||||
|
||||
def push_file(
|
||||
self,
|
||||
remote_path: str,
|
||||
content: bytes,
|
||||
message: str,
|
||||
sha: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""上传或更新文件到仓库
|
||||
|
||||
Args:
|
||||
remote_path: 仓库中的文件路径
|
||||
content: 文件内容(bytes)
|
||||
message: commit message
|
||||
sha: 文件当前 sha(更新时必传,新建时省略)
|
||||
|
||||
Returns:
|
||||
新的 sha,失败返回 None
|
||||
"""
|
||||
payload = {
|
||||
"message": message,
|
||||
"content": base64.b64encode(content).decode("ascii"),
|
||||
}
|
||||
if sha:
|
||||
payload["sha"] = sha
|
||||
|
||||
try:
|
||||
resp = requests.put(
|
||||
self._api_url(remote_path),
|
||||
headers={**self._headers, "Content-Type": "application/json"},
|
||||
json=payload,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
if resp.status_code not in (200, 201):
|
||||
logger.warning(f"推送文件失败: {resp.status_code} {resp.text[:200]}")
|
||||
return None
|
||||
|
||||
new_sha = resp.json().get("content", {}).get("sha", "")
|
||||
logger.info(f"推送文件成功: {remote_path} (sha={new_sha[:12]})")
|
||||
return new_sha
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"推送文件网络错误: {e}")
|
||||
return None
|
||||
|
||||
def file_exists(self, remote_path: str) -> Optional[str]:
|
||||
"""检查文件是否存在
|
||||
|
||||
Returns:
|
||||
文件 sha(存在)或 None(不存在)
|
||||
"""
|
||||
try:
|
||||
resp = requests.head(
|
||||
self._api_url(remote_path),
|
||||
headers=self._headers,
|
||||
timeout=self.timeout,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
# HEAD 不返回 body,需要 GET 获取 sha
|
||||
result = self.pull_file(remote_path)
|
||||
return result[1] if result else None
|
||||
return None
|
||||
except requests.RequestException:
|
||||
return None
|
||||
|
||||
def pull_json(self, remote_path: str) -> Optional[Tuple[dict, str]]:
|
||||
"""拉取并解析 JSON 文件
|
||||
|
||||
Returns:
|
||||
(parsed_dict, sha) 或 None
|
||||
"""
|
||||
result = self.pull_file(remote_path)
|
||||
if result is None:
|
||||
return None
|
||||
content_bytes, sha = result
|
||||
try:
|
||||
data = json.loads(content_bytes)
|
||||
return data, sha
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"解析 JSON 失败: {e}")
|
||||
return None
|
||||
|
||||
def push_json(self, remote_path: str, data: dict, message: str, sha: Optional[str] = None) -> Optional[str]:
|
||||
"""将 dict 序列化为 JSON 并推送
|
||||
|
||||
Returns:
|
||||
新的 sha,失败返回 None
|
||||
"""
|
||||
content = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8")
|
||||
return self.push_file(remote_path, content, message, sha)
|
||||
|
||||
def push_binary(self, remote_path: str, local_path: str, message: str) -> Optional[str]:
|
||||
"""读取本地二进制文件并推送到云端
|
||||
|
||||
Returns:
|
||||
新的 sha,失败返回 None
|
||||
"""
|
||||
try:
|
||||
with open(local_path, "rb") as f:
|
||||
content = f.read()
|
||||
except OSError as e:
|
||||
logger.error(f"读取本地文件失败: {local_path} — {e}")
|
||||
return None
|
||||
|
||||
existing_sha = self.file_exists(remote_path)
|
||||
return self.push_file(remote_path, content, message, sha=existing_sha)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, config) -> Optional["GiteaSync"]:
|
||||
"""从 ConfigManager 创建实例
|
||||
|
||||
Returns:
|
||||
GiteaSync 实例,配置不完整时返回 None
|
||||
"""
|
||||
base_url = config.get("Gitea", "base_url", fallback="").strip()
|
||||
owner = config.get("Gitea", "owner", fallback="").strip()
|
||||
repo = config.get("Gitea", "repo", fallback="").strip()
|
||||
token = config.get("Gitea", "token", fallback="").strip()
|
||||
|
||||
if not all([base_url, owner, repo, token]):
|
||||
logger.debug("Gitea 配置不完整,跳过云端同步")
|
||||
return None
|
||||
|
||||
return cls(base_url=base_url, owner=owner, repo=repo, token=token)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
文件操作工具模块
|
||||
--------------
|
||||
提供文件处理、查找和管理功能。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union, Any
|
||||
|
||||
from .log_utils import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def ensure_dir(directory: str) -> bool:
|
||||
"""
|
||||
确保目录存在,如果不存在则创建
|
||||
|
||||
Args:
|
||||
directory: 目录路径
|
||||
|
||||
Returns:
|
||||
是否成功创建或目录已存在
|
||||
"""
|
||||
try:
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"创建目录失败: {directory}, 错误: {e}")
|
||||
return False
|
||||
|
||||
def get_file_extension(file_path: str) -> str:
|
||||
"""
|
||||
获取文件扩展名(小写)
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
|
||||
Returns:
|
||||
文件扩展名,包含点(例如 .jpg)
|
||||
"""
|
||||
return os.path.splitext(file_path)[1].lower()
|
||||
|
||||
def is_valid_extension(file_path: str, allowed_extensions: List[str]) -> bool:
|
||||
"""
|
||||
检查文件扩展名是否在允许的列表中
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
allowed_extensions: 允许的扩展名列表(例如 ['.jpg', '.png'])
|
||||
|
||||
Returns:
|
||||
文件扩展名是否有效
|
||||
"""
|
||||
ext = get_file_extension(file_path)
|
||||
return ext in allowed_extensions
|
||||
|
||||
def get_files_by_extensions(directory: str, extensions: List[str], exclude_patterns: List[str] = None) -> List[str]:
|
||||
"""
|
||||
获取指定目录下所有符合扩展名的文件路径
|
||||
|
||||
Args:
|
||||
directory: 目录路径
|
||||
extensions: 扩展名列表(例如 ['.jpg', '.png'])
|
||||
exclude_patterns: 排除的文件名模式(例如 ['~$', '.tmp'])
|
||||
|
||||
Returns:
|
||||
文件路径列表
|
||||
"""
|
||||
if exclude_patterns is None:
|
||||
exclude_patterns = ['~$', '.tmp']
|
||||
|
||||
files = []
|
||||
for file in os.listdir(directory):
|
||||
file_path = os.path.join(directory, file)
|
||||
|
||||
# 检查是否是文件
|
||||
if not os.path.isfile(file_path):
|
||||
continue
|
||||
|
||||
# 检查扩展名
|
||||
if not is_valid_extension(file_path, extensions):
|
||||
continue
|
||||
|
||||
# 检查排除模式
|
||||
exclude = False
|
||||
for pattern in exclude_patterns:
|
||||
if pattern in file:
|
||||
exclude = True
|
||||
break
|
||||
|
||||
if not exclude:
|
||||
files.append(file_path)
|
||||
|
||||
return files
|
||||
|
||||
def get_latest_file(directory: str, pattern: str = "", extensions: List[str] = None) -> Optional[str]:
|
||||
"""
|
||||
获取指定目录下最新的文件
|
||||
|
||||
Args:
|
||||
directory: 目录路径
|
||||
pattern: 文件名包含的字符串模式
|
||||
extensions: 限制的文件扩展名列表
|
||||
|
||||
Returns:
|
||||
最新文件的路径,如果没有找到则返回None
|
||||
"""
|
||||
if not os.path.exists(directory):
|
||||
logger.warning(f"目录不存在: {directory}")
|
||||
return None
|
||||
|
||||
files = []
|
||||
for file in os.listdir(directory):
|
||||
# 检查模式和扩展名
|
||||
if (pattern and pattern not in file) or \
|
||||
(extensions and not is_valid_extension(file, extensions)):
|
||||
continue
|
||||
|
||||
file_path = os.path.join(directory, file)
|
||||
if os.path.isfile(file_path):
|
||||
files.append((file_path, os.path.getmtime(file_path)))
|
||||
|
||||
if not files:
|
||||
logger.warning(f"未在目录 {directory} 中找到符合条件的文件")
|
||||
return None
|
||||
|
||||
# 按修改时间排序,返回最新的
|
||||
sorted_files = sorted(files, key=lambda x: x[1], reverse=True)
|
||||
return sorted_files[0][0]
|
||||
|
||||
def generate_timestamp_filename(original_path: str) -> str:
|
||||
"""
|
||||
生成基于时间戳的文件名
|
||||
|
||||
Args:
|
||||
original_path: 原始文件路径
|
||||
|
||||
Returns:
|
||||
带时间戳的新文件路径
|
||||
"""
|
||||
dir_path = os.path.dirname(original_path)
|
||||
ext = os.path.splitext(original_path)[1]
|
||||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
return os.path.join(dir_path, f"{timestamp}{ext}")
|
||||
|
||||
def rename_file(source_path: str, target_path: str) -> bool:
|
||||
"""
|
||||
重命名文件
|
||||
|
||||
Args:
|
||||
source_path: 源文件路径
|
||||
target_path: 目标文件路径
|
||||
|
||||
Returns:
|
||||
是否成功重命名
|
||||
"""
|
||||
try:
|
||||
# 确保目标目录存在
|
||||
target_dir = os.path.dirname(target_path)
|
||||
ensure_dir(target_dir)
|
||||
|
||||
# 重命名文件
|
||||
os.rename(source_path, target_path)
|
||||
logger.info(f"文件已重命名: {os.path.basename(source_path)} -> {os.path.basename(target_path)}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"重命名文件失败: {e}")
|
||||
return False
|
||||
|
||||
def load_json(file_path: str, default: Any = None) -> Any:
|
||||
"""
|
||||
加载JSON文件
|
||||
|
||||
Args:
|
||||
file_path: JSON文件路径
|
||||
default: 如果文件不存在或加载失败时返回的默认值
|
||||
|
||||
Returns:
|
||||
JSON内容,或者默认值
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
return default
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"加载JSON文件失败: {file_path}, 错误: {e}")
|
||||
return default
|
||||
|
||||
def save_json(data: Any, file_path: str, ensure_ascii: bool = False, indent: int = 2) -> bool:
|
||||
"""
|
||||
保存数据到JSON文件
|
||||
|
||||
Args:
|
||||
data: 要保存的数据
|
||||
file_path: JSON文件路径
|
||||
ensure_ascii: 是否确保ASCII编码
|
||||
indent: 缩进空格数
|
||||
|
||||
Returns:
|
||||
是否成功保存
|
||||
"""
|
||||
try:
|
||||
# 确保目录存在
|
||||
directory = os.path.dirname(file_path)
|
||||
ensure_dir(directory)
|
||||
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, ensure_ascii=ensure_ascii, indent=indent)
|
||||
logger.debug(f"JSON数据已保存到: {file_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"保存JSON文件失败: {file_path}, 错误: {e}")
|
||||
return False
|
||||
|
||||
def smart_read_excel(file_path: Union[str, Path], **kwargs) -> Any:
|
||||
"""
|
||||
智能读取 Excel 文件,自动选择引擎并处理常见错误
|
||||
|
||||
Args:
|
||||
file_path: Excel 文件路径
|
||||
**kwargs: 传递给 pd.read_excel 的额外参数
|
||||
|
||||
Returns:
|
||||
pandas.DataFrame 对象
|
||||
"""
|
||||
import pandas as pd
|
||||
|
||||
path_str = str(file_path)
|
||||
ext = os.path.splitext(path_str)[1].lower()
|
||||
|
||||
# 自动选择引擎
|
||||
if ext == '.xlsx':
|
||||
kwargs.setdefault('engine', 'openpyxl')
|
||||
elif ext == '.xls':
|
||||
kwargs.setdefault('engine', 'xlrd')
|
||||
|
||||
try:
|
||||
return pd.read_excel(path_str, **kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"读取 Excel 文件失败: {path_str}, 错误: {e}")
|
||||
raise
|
||||
|
||||
def get_file_size(file_path: str) -> int:
|
||||
"""
|
||||
获取文件大小(字节)
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
|
||||
Returns:
|
||||
文件大小(字节)
|
||||
"""
|
||||
try:
|
||||
return os.path.getsize(file_path)
|
||||
except Exception as e:
|
||||
logger.error(f"获取文件大小失败: {file_path}, 错误: {e}")
|
||||
return 0
|
||||
|
||||
def is_file_size_valid(file_path: str, max_size_mb: float) -> bool:
|
||||
"""
|
||||
检查文件大小是否在允许范围内
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
max_size_mb: 最大允许大小(MB)
|
||||
|
||||
Returns:
|
||||
文件大小是否有效
|
||||
"""
|
||||
size_bytes = get_file_size(file_path)
|
||||
max_size_bytes = max_size_mb * 1024 * 1024
|
||||
return size_bytes <= max_size_bytes
|
||||
|
||||
|
||||
def format_file_size(size_bytes: int) -> str:
|
||||
"""将字节数格式化为可读的文件大小字符串(KB/MB)"""
|
||||
if size_bytes < 1024 * 1024:
|
||||
return f"{size_bytes / 1024:.1f} KB"
|
||||
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
||||
@@ -0,0 +1,180 @@
|
||||
"""
|
||||
日志工具模块
|
||||
----------
|
||||
提供统一的日志配置和管理功能。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict
|
||||
|
||||
# 日志处理器字典,用于跟踪已创建的处理器
|
||||
_handlers: Dict[str, logging.Handler] = {}
|
||||
|
||||
def setup_logger(name: str,
|
||||
log_file: Optional[str] = None,
|
||||
level=logging.INFO,
|
||||
console_output: bool = True,
|
||||
file_output: bool = True,
|
||||
log_format: str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') -> logging.Logger:
|
||||
"""
|
||||
配置并返回日志记录器
|
||||
|
||||
Args:
|
||||
name: 日志记录器的名称
|
||||
log_file: 日志文件路径,如果为None则使用默认路径
|
||||
level: 日志级别
|
||||
console_output: 是否输出到控制台
|
||||
file_output: 是否输出到文件
|
||||
log_format: 日志格式
|
||||
|
||||
Returns:
|
||||
配置好的日志记录器
|
||||
"""
|
||||
# 获取或创建日志记录器
|
||||
logger = logging.getLogger(name)
|
||||
|
||||
# 如果已经配置过处理器,不重复配置
|
||||
if logger.handlers:
|
||||
return logger
|
||||
|
||||
# 设置日志级别
|
||||
logger.setLevel(level)
|
||||
|
||||
# 创建格式化器
|
||||
formatter = logging.Formatter(log_format)
|
||||
|
||||
# 如果需要输出到文件
|
||||
if file_output:
|
||||
# 如果没有指定日志文件,使用默认路径
|
||||
if log_file is None:
|
||||
log_dir = os.path.abspath('logs')
|
||||
# 确保日志目录存在
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
log_file = os.path.join(log_dir, f"{name}.log")
|
||||
|
||||
# 创建文件处理器
|
||||
try:
|
||||
# 使用滚动日志,限制单个日志大小与备份数量
|
||||
file_handler = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
|
||||
file_handler.setFormatter(formatter)
|
||||
file_handler.setLevel(level)
|
||||
logger.addHandler(file_handler)
|
||||
_handlers[f"{name}_file"] = file_handler
|
||||
|
||||
# 记录活跃标记,避免被日志清理工具删除
|
||||
active_marker = os.path.join(os.path.dirname(log_file), f"{name}.active")
|
||||
with open(active_marker, 'w', encoding='utf-8') as f:
|
||||
f.write(f"Active since: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
except Exception as e:
|
||||
print(f"无法创建日志文件处理器: {e}")
|
||||
|
||||
# 如果需要输出到控制台
|
||||
if console_output:
|
||||
# 创建控制台处理器
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_handler.setFormatter(formatter)
|
||||
console_handler.setLevel(level)
|
||||
logger.addHandler(console_handler)
|
||||
_handlers[f"{name}_console"] = console_handler
|
||||
|
||||
return logger
|
||||
|
||||
def get_logger(name: str) -> logging.Logger:
|
||||
"""
|
||||
获取已配置的日志记录器,如果不存在则创建一个新的
|
||||
|
||||
Args:
|
||||
name: 日志记录器的名称
|
||||
|
||||
Returns:
|
||||
日志记录器
|
||||
"""
|
||||
logger = logging.getLogger(name)
|
||||
if not logger.handlers:
|
||||
return setup_logger(name)
|
||||
return logger
|
||||
|
||||
def set_log_level(level: str) -> None:
|
||||
"""
|
||||
设置所有日志记录器的级别
|
||||
|
||||
Args:
|
||||
level: 日志级别(DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
"""
|
||||
level_map = {
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL
|
||||
}
|
||||
|
||||
# 获取对应的日志级别
|
||||
log_level = level_map.get(level.lower(), logging.INFO)
|
||||
|
||||
# 获取所有记录器
|
||||
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
|
||||
|
||||
# 设置每个记录器的级别
|
||||
for logger in loggers:
|
||||
logger.setLevel(log_level)
|
||||
|
||||
# 设置根记录器的级别
|
||||
logging.getLogger().setLevel(log_level)
|
||||
|
||||
print(f"所有日志记录器级别已设置为: {logging.getLevelName(log_level)}")
|
||||
|
||||
def close_logger(name: str) -> None:
|
||||
"""
|
||||
关闭日志记录器的所有处理器
|
||||
|
||||
Args:
|
||||
name: 日志记录器的名称
|
||||
"""
|
||||
logger = logging.getLogger(name)
|
||||
for handler in logger.handlers[:]:
|
||||
handler.close()
|
||||
logger.removeHandler(handler)
|
||||
|
||||
# 清除处理器缓存
|
||||
_handlers.pop(f"{name}_file", None)
|
||||
_handlers.pop(f"{name}_console", None)
|
||||
|
||||
def close_all_loggers() -> None:
|
||||
"""
|
||||
关闭所有日志记录器的处理器
|
||||
"""
|
||||
# 获取所有记录器
|
||||
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
|
||||
|
||||
# 关闭每个记录器的处理器
|
||||
for logger in loggers:
|
||||
if hasattr(logger, 'handlers'):
|
||||
for handler in logger.handlers[:]:
|
||||
handler.close()
|
||||
logger.removeHandler(handler)
|
||||
|
||||
# 清空处理器缓存
|
||||
_handlers.clear()
|
||||
|
||||
print("所有日志记录器已关闭")
|
||||
|
||||
def cleanup_active_marker(name: str) -> None:
|
||||
"""
|
||||
清理日志活跃标记
|
||||
|
||||
Args:
|
||||
name: 日志记录器的名称
|
||||
"""
|
||||
try:
|
||||
log_dir = os.path.abspath('logs')
|
||||
active_marker = os.path.join(log_dir, f"{name}.active")
|
||||
if os.path.exists(active_marker):
|
||||
os.remove(active_marker)
|
||||
except Exception as e:
|
||||
print(f"无法清理日志活跃标记: {e}")
|
||||
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
字符串处理工具模块
|
||||
---------------
|
||||
提供字符串处理、正则表达式匹配等功能。
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
def clean_string(text: str) -> str:
|
||||
"""
|
||||
清理字符串,移除多余空白
|
||||
|
||||
Args:
|
||||
text: 源字符串
|
||||
|
||||
Returns:
|
||||
清理后的字符串
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return ""
|
||||
|
||||
# 移除首尾空白
|
||||
text = text.strip()
|
||||
# 移除多余空白
|
||||
text = re.sub(r'\s+', ' ', text)
|
||||
return text
|
||||
|
||||
def remove_non_digits(text: str) -> str:
|
||||
"""
|
||||
移除字符串中的非数字字符
|
||||
|
||||
Args:
|
||||
text: 源字符串
|
||||
|
||||
Returns:
|
||||
只包含数字的字符串
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return ""
|
||||
|
||||
return re.sub(r'\D', '', text)
|
||||
|
||||
def extract_number(text: str) -> Optional[float]:
|
||||
"""
|
||||
从字符串中提取数字
|
||||
|
||||
Args:
|
||||
text: 源字符串
|
||||
|
||||
Returns:
|
||||
提取的数字,如果没有则返回None
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return None
|
||||
|
||||
# 匹配数字(可以包含小数点和负号)
|
||||
match = re.search(r'-?\d+(\.\d+)?', text)
|
||||
if match:
|
||||
return float(match.group())
|
||||
return None
|
||||
|
||||
def extract_unit(text: str, units: List[str] = None) -> Optional[str]:
|
||||
"""
|
||||
从字符串中提取单位
|
||||
|
||||
Args:
|
||||
text: 源字符串
|
||||
units: 有效单位列表,如果为None则自动识别
|
||||
|
||||
Returns:
|
||||
提取的单位,如果没有则返回None
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return None
|
||||
|
||||
# 如果提供了单位列表,检查字符串中是否包含
|
||||
if units:
|
||||
for unit in units:
|
||||
if unit in text:
|
||||
return unit
|
||||
return None
|
||||
|
||||
# 否则,尝试自动识别常见单位
|
||||
# 正则表达式:匹配数字后面的非数字部分作为单位
|
||||
match = re.search(r'\d+\s*([^\d\s]+)', text)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
def extract_number_and_unit(text: str) -> Tuple[Optional[float], Optional[str]]:
|
||||
"""
|
||||
从字符串中同时提取数字和单位
|
||||
|
||||
Args:
|
||||
text: 源字符串
|
||||
|
||||
Returns:
|
||||
(数字, 单位)元组,如果没有则对应返回None
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
return None, None
|
||||
|
||||
# 匹配数字和单位的组合
|
||||
match = re.search(r'(-?\d+(?:\.\d+)?)\s*([^\d\s]+)?', text)
|
||||
if match:
|
||||
number = float(match.group(1))
|
||||
unit = match.group(2) if match.group(2) else None
|
||||
return number, unit
|
||||
return None, None
|
||||
|
||||
def parse_specification(spec_str: str) -> Optional[int]:
|
||||
"""
|
||||
解析规格字符串,提取包装数量
|
||||
支持格式:1*15, 1x15, 1*5*10
|
||||
|
||||
Args:
|
||||
spec_str: 规格字符串
|
||||
|
||||
Returns:
|
||||
包装数量,如果无法解析则返回None
|
||||
"""
|
||||
if not spec_str or not isinstance(spec_str, str):
|
||||
return None
|
||||
|
||||
try:
|
||||
# 清理规格字符串
|
||||
spec_str = clean_string(spec_str)
|
||||
|
||||
# 匹配重量/容量格式,如"450g*15"、"450ml*15"
|
||||
match = re.search(r'\d+(?:g|ml|毫升|克)[*xX×](\d+)', spec_str)
|
||||
if match:
|
||||
# 返回后面的数量
|
||||
return int(match.group(1))
|
||||
|
||||
# 匹配1*5*10 格式的三级规格
|
||||
match = re.search(r'(\d+)[\*xX×](\d+)[\*xX×](\d+)', spec_str)
|
||||
if match:
|
||||
# 取最后一个数字作为袋数量
|
||||
return int(match.group(3))
|
||||
|
||||
# 匹配1*15, 1x15 格式
|
||||
match = re.search(r'(\d+)[\*xX×](\d+)', spec_str)
|
||||
if match:
|
||||
# 取第二个数字作为包装数量
|
||||
return int(match.group(2))
|
||||
|
||||
# 匹配24瓶/件等格式
|
||||
match = re.search(r'(\d+)[瓶个支袋][//](件|箱)', spec_str)
|
||||
if match:
|
||||
return int(match.group(1))
|
||||
|
||||
# 匹配4L格式
|
||||
match = re.search(r'(\d+(?:\.\d+)?)\s*[Ll升][*×]?(\d+)?', spec_str)
|
||||
if match:
|
||||
# 如果有第二个数字,返回它;否则返回1
|
||||
return int(match.group(2)) if match.group(2) else 1
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def clean_barcode(barcode: Any) -> str:
|
||||
"""
|
||||
清理条码格式
|
||||
|
||||
Args:
|
||||
barcode: 条码(可以是字符串、整数或浮点数)
|
||||
|
||||
Returns:
|
||||
清理后的条码字符串
|
||||
"""
|
||||
if isinstance(barcode, (int, float)):
|
||||
barcode = f"{barcode:.0f}"
|
||||
|
||||
# 清理条码格式,移除可能的非数字字符(包括小数点)
|
||||
barcode_clean = re.sub(r'\.0+$', '', str(barcode)) # 移除末尾0
|
||||
barcode_clean = re.sub(r'\D', '', barcode_clean) # 只保留数字
|
||||
|
||||
return barcode_clean
|
||||
|
||||
def is_scientific_notation(value: str) -> bool:
|
||||
"""
|
||||
检查字符串是否是科学计数法表示
|
||||
|
||||
Args:
|
||||
value: 字符串值
|
||||
|
||||
Returns:
|
||||
是否是科学计数法
|
||||
"""
|
||||
return bool(re.match(r'^-?\d+(\.\d+)?[eE][+-]?\d+$', str(value)))
|
||||
|
||||
def parse_monetary_string(value: Any) -> Optional[float]:
|
||||
"""
|
||||
解析金额/数量字符串为浮点数。
|
||||
处理: 货币符号(¥/$)、逗号作小数点、逗号作千位分隔符、中文"元"后缀等。
|
||||
|
||||
Args:
|
||||
value: 金额值(字符串、数字或其他类型)
|
||||
|
||||
Returns:
|
||||
解析后的浮点数,无法解析则返回 None
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
|
||||
s = value.strip()
|
||||
if not s or s.lower() in ('o', 'none', 'null', '-', '--'):
|
||||
return None
|
||||
|
||||
# 移除非数字字符,保留数字、小数点、逗号和负号
|
||||
cleaned = re.sub(r'[^\d\.\-,]', '', s)
|
||||
if not cleaned or cleaned in ('-', '.', '-.', ','):
|
||||
return None
|
||||
|
||||
# 逗号处理策略:
|
||||
# 多个逗号 -> 千位分隔符,全部移除 (如 "1,234,567" = 1234567)
|
||||
# 一个逗号 + 无小数点 -> 逗号当小数点 (如 "1,5" = 1.5)
|
||||
# 一个逗号 + 有小数点 -> 千位分隔符,移除 (如 "1,234.56" = 1234.56)
|
||||
comma_count = cleaned.count(',')
|
||||
if comma_count > 1:
|
||||
cleaned = cleaned.replace(',', '')
|
||||
elif comma_count == 1 and '.' not in cleaned:
|
||||
cleaned = cleaned.replace(',', '.')
|
||||
elif comma_count == 1 and '.' in cleaned:
|
||||
cleaned = cleaned.replace(',', '')
|
||||
|
||||
try:
|
||||
return float(cleaned)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def format_barcode(barcode: Any) -> str:
|
||||
"""
|
||||
格式化条码,处理科学计数法
|
||||
|
||||
Args:
|
||||
barcode: 条码值
|
||||
|
||||
Returns:
|
||||
格式化后的条码字符串
|
||||
"""
|
||||
if barcode is None:
|
||||
return ""
|
||||
|
||||
# 先转为字符串
|
||||
barcode_str = str(barcode).strip()
|
||||
|
||||
# 判断是否为科学计数法
|
||||
if is_scientific_notation(barcode_str):
|
||||
try:
|
||||
# 科学计数法转为普通数字字符串
|
||||
barcode_str = f"{float(barcode_str):.0f}"
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# 移除可能的小数部分(如"123456.0"变为"123456")
|
||||
if '.' in barcode_str:
|
||||
barcode_str = re.sub(r'\.0+$', '', barcode_str)
|
||||
|
||||
# 确保是纯数字字符串
|
||||
if not barcode_str.isdigit():
|
||||
# 只保留数字字符
|
||||
barcode_str = re.sub(r'\D', '', barcode_str)
|
||||
|
||||
# 新增:处理末尾多余的0,标准条码通常为12-13位
|
||||
if len(barcode_str) > 13 and barcode_str.endswith('0'):
|
||||
# 从末尾开始移除多余的0,直到条码长度为13位或者不再以0结尾
|
||||
while len(barcode_str) > 13 and barcode_str.endswith('0'):
|
||||
barcode_str = barcode_str[:-1]
|
||||
|
||||
return barcode_str
|
||||
Reference in New Issue
Block a user