"""app.core.handlers.data_cleaner 单元测试""" import pytest import pandas as pd from app.core.handlers.data_cleaner import DataCleaner @pytest.fixture def sample_df(): return pd.DataFrame({ 'name': [' Alice ', 'Bob', 'Charlie', 'Dave'], 'age': [25, 30, None, 40], 'score': [80.5, 90.0, 70.0, 85.0], 'city': ['Beijing', 'Shanghai', 'Beijing', 'Guangzhou'], }) class TestFillNa: def test_fill_na_with_value(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('fill_na', columns=['age'], value=0) result = cleaner.clean(sample_df) assert result['age'].isna().sum() == 0 assert result.loc[2, 'age'] == 0 def test_fill_na_all_columns(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('fill_na', value=-1) result = cleaner.clean(sample_df) assert result.isna().sum().sum() == 0 def test_fill_na_string_column(self): df = pd.DataFrame({'a': ['x', None, 'z']}) cleaner = DataCleaner() cleaner.add_rule('fill_na', columns=['a'], value='unknown') result = cleaner.clean(df) assert result.loc[1, 'a'] == 'unknown' def test_convenience_method(self, sample_df): cleaner = DataCleaner() cleaner.fill_na(columns='age', value=99) result = cleaner.clean(sample_df) assert result.loc[2, 'age'] == 99 class TestRemoveDuplicates: def test_remove_by_subset(self): df = pd.DataFrame({ 'name': ['A', 'B', 'A', 'C'], 'val': [1, 2, 3, 4], }) cleaner = DataCleaner() cleaner.add_rule('remove_duplicates', subset=['name'], keep='first') result = cleaner.clean(df) assert len(result) == 3 assert list(result['name']) == ['A', 'B', 'C'] def test_remove_all_columns(self): df = pd.DataFrame({ 'a': [1, 1, 2], 'b': [10, 10, 20], }) cleaner = DataCleaner() cleaner.add_rule('remove_duplicates') result = cleaner.clean(df) assert len(result) == 2 def test_no_duplicates(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('remove_duplicates', subset=['name']) result = cleaner.clean(sample_df) assert len(result) == 4 class TestRemoveRows: def test_remove_by_condition(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('remove_rows', condition='age > 25') result = cleaner.clean(sample_df) assert len(result) == 2 def test_remove_by_values(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('remove_rows', columns=['city'], values=['Beijing']) result = cleaner.clean(sample_df) assert len(result) == 2 assert 'Beijing' not in result['city'].values def test_remove_no_match(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('remove_rows', condition='age > 100') result = cleaner.clean(sample_df) assert len(result) == 0 # condition filter: no rows match age > 100 def test_convenience_method(self, sample_df): cleaner = DataCleaner() cleaner.remove_rows(condition='score < 75') result = cleaner.clean(sample_df) assert len(result) == 1 # condition filter: keeps only Charlie (score=70.0) class TestConvertType: def test_to_float(self): df = pd.DataFrame({'val': ['1.5', '2.7', 'abc']}) cleaner = DataCleaner() cleaner.add_rule('convert_type', columns=['val'], target_type='float') result = cleaner.clean(df) assert result['val'].dtype.kind == 'f' assert result.loc[0, 'val'] == 1.5 assert pd.isna(result.loc[2, 'val']) def test_to_int(self): df = pd.DataFrame({'val': ['1', '2', '3']}) cleaner = DataCleaner() cleaner.add_rule('convert_type', columns=['val'], target_type='int') result = cleaner.clean(df) assert result.loc[0, 'val'] == 1 def test_to_string(self): df = pd.DataFrame({'val': [1, 2, 3]}) cleaner = DataCleaner() cleaner.add_rule('convert_type', columns=['val'], target_type='string') result = cleaner.clean(df) assert result.loc[0, 'val'] == '1' def test_missing_column_skipped(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float') result = cleaner.clean(sample_df) assert len(result) == 4 class TestStripWhitespace: def test_strip_specific_columns(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('strip_whitespace', columns=['name']) result = cleaner.clean(sample_df) assert result.loc[0, 'name'] == 'Alice' def test_strip_all_text(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('strip_whitespace') result = cleaner.clean(sample_df) assert result.loc[0, 'name'] == 'Alice' def test_strip_non_text_skipped(self): df = pd.DataFrame({'val': [1, 2, 3]}) cleaner = DataCleaner() cleaner.add_rule('strip_whitespace', columns=['val']) result = cleaner.clean(df) assert list(result['val']) == [1, 2, 3] class TestNormalizeText: def test_lowercase(self): df = pd.DataFrame({'name': ['ALICE', 'BOB']}) cleaner = DataCleaner() cleaner.add_rule('normalize_text', columns=['name'], lowercase=True) result = cleaner.clean(df) assert list(result['name']) == ['alice', 'bob'] def test_uppercase(self): df = pd.DataFrame({'name': ['alice', 'bob']}) cleaner = DataCleaner() cleaner.add_rule('normalize_text', columns=['name'], uppercase=True) result = cleaner.clean(df) assert list(result['name']) == ['ALICE', 'BOB'] def test_replace_map(self): df = pd.DataFrame({'city': ['BJ', 'SH']}) cleaner = DataCleaner() cleaner.add_rule('normalize_text', columns=['city'], replace_map={'BJ': 'Beijing', 'SH': 'Shanghai'}) result = cleaner.clean(df) assert list(result['city']) == ['Beijing', 'Shanghai'] class TestValidateData: def test_validate_logs_but_does_not_modify(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('validate_data', columns=['score'], min_value=0, max_value=100) result = cleaner.clean(sample_df) assert len(result) == 4 def test_validate_required(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('validate_data', columns=['age'], required=True) result = cleaner.clean(sample_df) assert len(result) == 4 class TestChaining: def test_multiple_rules(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('strip_whitespace', columns=['name']) cleaner.add_rule('fill_na', columns=['age'], value=0) cleaner.add_rule('convert_type', columns=['age'], target_type='int') result = cleaner.clean(sample_df) assert result.loc[0, 'name'] == 'Alice' assert result['age'].isna().sum() == 0 assert result.loc[2, 'age'] == 0 def test_convenience_chaining(self, sample_df): cleaner = DataCleaner() cleaner.strip_whitespace('name').fill_na('age', value=0) result = cleaner.clean(sample_df) assert result.loc[0, 'name'] == 'Alice' assert result.loc[2, 'age'] == 0 class TestEdgeCases: def test_empty_dataframe(self): df = pd.DataFrame({'a': pd.Series([], dtype=float)}) cleaner = DataCleaner() cleaner.add_rule('fill_na', value=0) result = cleaner.clean(df) assert len(result) == 0 def test_no_rules(self, sample_df): cleaner = DataCleaner() result = cleaner.clean(sample_df) assert len(result) == 4 def test_unknown_rule_type(self, sample_df): cleaner = DataCleaner() cleaner.add_rule('unknown_op', columns=['name']) result = cleaner.clean(sample_df) assert len(result) == 4 def test_rule_failure_continues(self, sample_df): """A failing rule should not block subsequent rules.""" cleaner = DataCleaner() cleaner.add_rule('convert_type', columns=['nonexistent'], target_type='float') cleaner.add_rule('fill_na', columns=['age'], value=0) result = cleaner.clean(sample_df) assert result.loc[2, 'age'] == 0