e4d62df7e3
- 智能供应商识别(蓉城易购/烟草/杨碧月/通用) - 百度 OCR 表格识别集成 - 规则引擎(列映射/数据清洗/单位转换/规格推断) - 条码映射管理与云端同步(Gitea REST API) - 云端同步支持:条码映射、供应商配置、商品资料、采购模板 - 拖拽一键处理(图片→OCR→Excel→合并) - 191 个单元测试 - 移除无用的模板管理功能 - 清理 IDE 产物目录 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
237 lines
8.3 KiB
Python
237 lines
8.3 KiB
Python
"""app.core.handlers.data_cleaner 单元测试"""
|
|
|
|
import pytest
|
|
import pandas as pd
|
|
|
|
from app.core.handlers.data_cleaner import DataCleaner
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_df():
|
|
return pd.DataFrame({
|
|
'name': [' Alice ', 'Bob', 'Charlie', 'Dave'],
|
|
'age': [25, 30, None, 40],
|
|
'score': [80.5, 90.0, 70.0, 85.0],
|
|
'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou'],
|
|
})
|
|
|
|
|
|
class TestFillNa:
|
|
def test_fill_na_with_value(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('fill_na', columns=['age'], value=0)
|
|
result = cleaner.clean(sample_df)
|
|
assert result['age'].isna().sum() == 0
|
|
assert result.loc[2, 'age'] == 0
|
|
|
|
def test_fill_na_all_columns(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('fill_na', value=-1)
|
|
result = cleaner.clean(sample_df)
|
|
assert result.isna().sum().sum() == 0
|
|
|
|
def test_fill_na_string_column(self):
|
|
df = pd.DataFrame({'a': ['x', None, 'z']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('fill_na', columns=['a'], value='unknown')
|
|
result = cleaner.clean(df)
|
|
assert result.loc[1, 'a'] == 'unknown'
|
|
|
|
def test_convenience_method(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.fill_na(columns='age', value=99)
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[2, 'age'] == 99
|
|
|
|
|
|
class TestRemoveDuplicates:
|
|
def test_remove_by_subset(self):
|
|
df = pd.DataFrame({
|
|
'name': ['A', 'B', 'A', 'C'],
|
|
'val': [1, 2, 3, 4],
|
|
})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_duplicates', subset=['name'], keep='first')
|
|
result = cleaner.clean(df)
|
|
assert len(result) == 3
|
|
assert list(result['name']) == ['A', 'B', 'C']
|
|
|
|
def test_remove_all_columns(self):
|
|
df = pd.DataFrame({
|
|
'a': [1, 1, 2],
|
|
'b': [10, 10, 20],
|
|
})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_duplicates')
|
|
result = cleaner.clean(df)
|
|
assert len(result) == 2
|
|
|
|
def test_no_duplicates(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_duplicates', subset=['name'])
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
|
|
class TestRemoveRows:
|
|
def test_remove_by_condition(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_rows', condition='age > 25')
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 2
|
|
|
|
def test_remove_by_values(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_rows', columns=['city'], values=['Beijing'])
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 2
|
|
assert 'Beijing' not in result['city'].values
|
|
|
|
def test_remove_no_match(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('remove_rows', condition='age > 100')
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 0 # condition filter: no rows match age > 100
|
|
|
|
def test_convenience_method(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.remove_rows(condition='score < 75')
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 1 # condition filter: keeps only Charlie (score=70.0)
|
|
|
|
|
|
class TestConvertType:
|
|
def test_to_float(self):
|
|
df = pd.DataFrame({'val': ['1.5', '2.7', 'abc']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('convert_type', columns=['val'], target_type='float')
|
|
result = cleaner.clean(df)
|
|
assert result['val'].dtype.kind == 'f'
|
|
assert result.loc[0, 'val'] == 1.5
|
|
assert pd.isna(result.loc[2, 'val'])
|
|
|
|
def test_to_int(self):
|
|
df = pd.DataFrame({'val': ['1', '2', '3']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('convert_type', columns=['val'], target_type='int')
|
|
result = cleaner.clean(df)
|
|
assert result.loc[0, 'val'] == 1
|
|
|
|
def test_to_string(self):
|
|
df = pd.DataFrame({'val': [1, 2, 3]})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('convert_type', columns=['val'], target_type='string')
|
|
result = cleaner.clean(df)
|
|
assert result.loc[0, 'val'] == '1'
|
|
|
|
def test_missing_column_skipped(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float')
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
|
|
class TestStripWhitespace:
|
|
def test_strip_specific_columns(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('strip_whitespace', columns=['name'])
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[0, 'name'] == 'Alice'
|
|
|
|
def test_strip_all_text(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('strip_whitespace')
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[0, 'name'] == 'Alice'
|
|
|
|
def test_strip_non_text_skipped(self):
|
|
df = pd.DataFrame({'val': [1, 2, 3]})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('strip_whitespace', columns=['val'])
|
|
result = cleaner.clean(df)
|
|
assert list(result['val']) == [1, 2, 3]
|
|
|
|
|
|
class TestNormalizeText:
|
|
def test_lowercase(self):
|
|
df = pd.DataFrame({'name': ['ALICE', 'BOB']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('normalize_text', columns=['name'], lowercase=True)
|
|
result = cleaner.clean(df)
|
|
assert list(result['name']) == ['alice', 'bob']
|
|
|
|
def test_uppercase(self):
|
|
df = pd.DataFrame({'name': ['alice', 'bob']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('normalize_text', columns=['name'], uppercase=True)
|
|
result = cleaner.clean(df)
|
|
assert list(result['name']) == ['ALICE', 'BOB']
|
|
|
|
def test_replace_map(self):
|
|
df = pd.DataFrame({'city': ['BJ', 'SH']})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('normalize_text', columns=['city'], replace_map={'BJ': 'Beijing', 'SH': 'Shanghai'})
|
|
result = cleaner.clean(df)
|
|
assert list(result['city']) == ['Beijing', 'Shanghai']
|
|
|
|
|
|
class TestValidateData:
|
|
def test_validate_logs_but_does_not_modify(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('validate_data', columns=['score'], min_value=0, max_value=100)
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
def test_validate_required(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('validate_data', columns=['age'], required=True)
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
|
|
class TestChaining:
|
|
def test_multiple_rules(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('strip_whitespace', columns=['name'])
|
|
cleaner.add_rule('fill_na', columns=['age'], value=0)
|
|
cleaner.add_rule('convert_type', columns=['age'], target_type='int')
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[0, 'name'] == 'Alice'
|
|
assert result['age'].isna().sum() == 0
|
|
assert result.loc[2, 'age'] == 0
|
|
|
|
def test_convenience_chaining(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.strip_whitespace('name').fill_na('age', value=0)
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[0, 'name'] == 'Alice'
|
|
assert result.loc[2, 'age'] == 0
|
|
|
|
|
|
class TestEdgeCases:
|
|
def test_empty_dataframe(self):
|
|
df = pd.DataFrame({'a': pd.Series([], dtype=float)})
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('fill_na', value=0)
|
|
result = cleaner.clean(df)
|
|
assert len(result) == 0
|
|
|
|
def test_no_rules(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
def test_unknown_rule_type(self, sample_df):
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('unknown_op', columns=['name'])
|
|
result = cleaner.clean(sample_df)
|
|
assert len(result) == 4
|
|
|
|
def test_rule_failure_continues(self, sample_df):
|
|
"""A failing rule should not block subsequent rules."""
|
|
cleaner = DataCleaner()
|
|
cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float')
|
|
cleaner.add_rule('fill_na', columns=['age'], value=0)
|
|
result = cleaner.clean(sample_df)
|
|
assert result.loc[2, 'age'] == 0
|