Files
orc-order-v2/tests/test_data_cleaner.py
T
houhuan e4d62df7e3 feat: 益选 OCR 订单处理系统初始提交
- 智能供应商识别(蓉城易购/烟草/杨碧月/通用)
- 百度 OCR 表格识别集成
- 规则引擎(列映射/数据清洗/单位转换/规格推断)
- 条码映射管理与云端同步(Gitea REST API)
- 云端同步支持:条码映射、供应商配置、商品资料、采购模板
- 拖拽一键处理(图片→OCR→Excel→合并)
- 191 个单元测试
- 移除无用的模板管理功能
- 清理 IDE 产物目录

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-04 19:51:13 +08:00

237 lines
8.3 KiB
Python

"""app.core.handlers.data_cleaner 单元测试"""
import pytest
import pandas as pd
from app.core.handlers.data_cleaner import DataCleaner
@pytest.fixture
def sample_df():
return pd.DataFrame({
'name': [' Alice ', 'Bob', 'Charlie', 'Dave'],
'age': [25, 30, None, 40],
'score': [80.5, 90.0, 70.0, 85.0],
'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou'],
})
class TestFillNa:
def test_fill_na_with_value(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('fill_na', columns=['age'], value=0)
result = cleaner.clean(sample_df)
assert result['age'].isna().sum() == 0
assert result.loc[2, 'age'] == 0
def test_fill_na_all_columns(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('fill_na', value=-1)
result = cleaner.clean(sample_df)
assert result.isna().sum().sum() == 0
def test_fill_na_string_column(self):
df = pd.DataFrame({'a': ['x', None, 'z']})
cleaner = DataCleaner()
cleaner.add_rule('fill_na', columns=['a'], value='unknown')
result = cleaner.clean(df)
assert result.loc[1, 'a'] == 'unknown'
def test_convenience_method(self, sample_df):
cleaner = DataCleaner()
cleaner.fill_na(columns='age', value=99)
result = cleaner.clean(sample_df)
assert result.loc[2, 'age'] == 99
class TestRemoveDuplicates:
def test_remove_by_subset(self):
df = pd.DataFrame({
'name': ['A', 'B', 'A', 'C'],
'val': [1, 2, 3, 4],
})
cleaner = DataCleaner()
cleaner.add_rule('remove_duplicates', subset=['name'], keep='first')
result = cleaner.clean(df)
assert len(result) == 3
assert list(result['name']) == ['A', 'B', 'C']
def test_remove_all_columns(self):
df = pd.DataFrame({
'a': [1, 1, 2],
'b': [10, 10, 20],
})
cleaner = DataCleaner()
cleaner.add_rule('remove_duplicates')
result = cleaner.clean(df)
assert len(result) == 2
def test_no_duplicates(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('remove_duplicates', subset=['name'])
result = cleaner.clean(sample_df)
assert len(result) == 4
class TestRemoveRows:
def test_remove_by_condition(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('remove_rows', condition='age > 25')
result = cleaner.clean(sample_df)
assert len(result) == 2
def test_remove_by_values(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('remove_rows', columns=['city'], values=['Beijing'])
result = cleaner.clean(sample_df)
assert len(result) == 2
assert 'Beijing' not in result['city'].values
def test_remove_no_match(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('remove_rows', condition='age > 100')
result = cleaner.clean(sample_df)
assert len(result) == 0 # condition filter: no rows match age > 100
def test_convenience_method(self, sample_df):
cleaner = DataCleaner()
cleaner.remove_rows(condition='score < 75')
result = cleaner.clean(sample_df)
assert len(result) == 1 # condition filter: keeps only Charlie (score=70.0)
class TestConvertType:
def test_to_float(self):
df = pd.DataFrame({'val': ['1.5', '2.7', 'abc']})
cleaner = DataCleaner()
cleaner.add_rule('convert_type', columns=['val'], target_type='float')
result = cleaner.clean(df)
assert result['val'].dtype.kind == 'f'
assert result.loc[0, 'val'] == 1.5
assert pd.isna(result.loc[2, 'val'])
def test_to_int(self):
df = pd.DataFrame({'val': ['1', '2', '3']})
cleaner = DataCleaner()
cleaner.add_rule('convert_type', columns=['val'], target_type='int')
result = cleaner.clean(df)
assert result.loc[0, 'val'] == 1
def test_to_string(self):
df = pd.DataFrame({'val': [1, 2, 3]})
cleaner = DataCleaner()
cleaner.add_rule('convert_type', columns=['val'], target_type='string')
result = cleaner.clean(df)
assert result.loc[0, 'val'] == '1'
def test_missing_column_skipped(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float')
result = cleaner.clean(sample_df)
assert len(result) == 4
class TestStripWhitespace:
def test_strip_specific_columns(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('strip_whitespace', columns=['name'])
result = cleaner.clean(sample_df)
assert result.loc[0, 'name'] == 'Alice'
def test_strip_all_text(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('strip_whitespace')
result = cleaner.clean(sample_df)
assert result.loc[0, 'name'] == 'Alice'
def test_strip_non_text_skipped(self):
df = pd.DataFrame({'val': [1, 2, 3]})
cleaner = DataCleaner()
cleaner.add_rule('strip_whitespace', columns=['val'])
result = cleaner.clean(df)
assert list(result['val']) == [1, 2, 3]
class TestNormalizeText:
def test_lowercase(self):
df = pd.DataFrame({'name': ['ALICE', 'BOB']})
cleaner = DataCleaner()
cleaner.add_rule('normalize_text', columns=['name'], lowercase=True)
result = cleaner.clean(df)
assert list(result['name']) == ['alice', 'bob']
def test_uppercase(self):
df = pd.DataFrame({'name': ['alice', 'bob']})
cleaner = DataCleaner()
cleaner.add_rule('normalize_text', columns=['name'], uppercase=True)
result = cleaner.clean(df)
assert list(result['name']) == ['ALICE', 'BOB']
def test_replace_map(self):
df = pd.DataFrame({'city': ['BJ', 'SH']})
cleaner = DataCleaner()
cleaner.add_rule('normalize_text', columns=['city'], replace_map={'BJ': 'Beijing', 'SH': 'Shanghai'})
result = cleaner.clean(df)
assert list(result['city']) == ['Beijing', 'Shanghai']
class TestValidateData:
def test_validate_logs_but_does_not_modify(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('validate_data', columns=['score'], min_value=0, max_value=100)
result = cleaner.clean(sample_df)
assert len(result) == 4
def test_validate_required(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('validate_data', columns=['age'], required=True)
result = cleaner.clean(sample_df)
assert len(result) == 4
class TestChaining:
def test_multiple_rules(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('strip_whitespace', columns=['name'])
cleaner.add_rule('fill_na', columns=['age'], value=0)
cleaner.add_rule('convert_type', columns=['age'], target_type='int')
result = cleaner.clean(sample_df)
assert result.loc[0, 'name'] == 'Alice'
assert result['age'].isna().sum() == 0
assert result.loc[2, 'age'] == 0
def test_convenience_chaining(self, sample_df):
cleaner = DataCleaner()
cleaner.strip_whitespace('name').fill_na('age', value=0)
result = cleaner.clean(sample_df)
assert result.loc[0, 'name'] == 'Alice'
assert result.loc[2, 'age'] == 0
class TestEdgeCases:
def test_empty_dataframe(self):
df = pd.DataFrame({'a': pd.Series([], dtype=float)})
cleaner = DataCleaner()
cleaner.add_rule('fill_na', value=0)
result = cleaner.clean(df)
assert len(result) == 0
def test_no_rules(self, sample_df):
cleaner = DataCleaner()
result = cleaner.clean(sample_df)
assert len(result) == 4
def test_unknown_rule_type(self, sample_df):
cleaner = DataCleaner()
cleaner.add_rule('unknown_op', columns=['name'])
result = cleaner.clean(sample_df)
assert len(result) == 4
def test_rule_failure_continues(self, sample_df):
"""A failing rule should not block subsequent rules."""
cleaner = DataCleaner()
cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float')
cleaner.add_rule('fill_na', columns=['age'], value=0)
result = cleaner.clean(sample_df)
assert result.loc[2, 'age'] == 0