e4d62df7e3
- 智能供应商识别(蓉城易购/烟草/杨碧月/通用) - 百度 OCR 表格识别集成 - 规则引擎(列映射/数据清洗/单位转换/规格推断) - 条码映射管理与云端同步(Gitea REST API) - 云端同步支持:条码映射、供应商配置、商品资料、采购模板 - 拖拽一键处理(图片→OCR→Excel→合并) - 191 个单元测试 - 移除无用的模板管理功能 - 清理 IDE 产物目录 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
223 lines
8.3 KiB
Python
223 lines
8.3 KiB
Python
"""app.core.handlers.calculator 单元测试"""
|
|
|
|
import pytest
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from app.core.handlers.calculator import DataCalculator
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_df():
|
|
return pd.DataFrame({
|
|
'price': [10.0, 20.0, 30.0],
|
|
'quantity': [2, 5, 10],
|
|
'name': ['A', 'B', 'C'],
|
|
})
|
|
|
|
|
|
class TestMultiply:
|
|
def test_basic_multiply(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('multiply', source_column='price', target_column='total', factor=2)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['total']) == [20.0, 40.0, 60.0]
|
|
|
|
def test_multiply_missing_source(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('multiply', source_column='nonexistent', target_column='total', factor=2)
|
|
result = calc.calculate(sample_df)
|
|
assert 'total' not in result.columns
|
|
|
|
def test_multiply_default_factor(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('multiply', source_column='price', target_column='copy', factor=1)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['copy']) == [10.0, 20.0, 30.0]
|
|
|
|
def test_convenience_method(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.multiply('price', 'total', 3)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['total']) == [30.0, 60.0, 90.0]
|
|
|
|
|
|
class TestDivide:
|
|
def test_basic_divide(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('divide', source_column='price', target_column='half', divisor=2)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['half']) == [5.0, 10.0, 15.0]
|
|
|
|
def test_divide_by_zero(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('divide', source_column='price', target_column='half', divisor=0)
|
|
result = calc.calculate(sample_df)
|
|
assert 'half' not in result.columns
|
|
|
|
def test_divide_missing_source(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('divide', source_column='nonexistent', target_column='x', divisor=2)
|
|
result = calc.calculate(sample_df)
|
|
assert 'x' not in result.columns
|
|
|
|
|
|
class TestAdd:
|
|
def test_add_columns(self):
|
|
df = pd.DataFrame({'a': [1, 2, 3], 'b': [10, 20, 30]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('add', columns=['a', 'b'], target_column='sum')
|
|
result = calc.calculate(df)
|
|
assert list(result['sum']) == [11, 22, 33]
|
|
|
|
def test_add_constant(self):
|
|
df = pd.DataFrame({'a': [1, 2, 3]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('add', target_column='a', constant=100)
|
|
result = calc.calculate(df)
|
|
assert list(result['a']) == [101, 102, 103]
|
|
|
|
def test_add_columns_with_constant(self):
|
|
df = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('add', columns=['a', 'b'], target_column='total', constant=10)
|
|
result = calc.calculate(df)
|
|
assert list(result['total']) == [14, 16]
|
|
|
|
def test_add_string_column(self):
|
|
df = pd.DataFrame({'a': [1, 2]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('add', columns='a', target_column='total')
|
|
result = calc.calculate(df)
|
|
assert list(result['total']) == [1, 2]
|
|
|
|
|
|
class TestSubtract:
|
|
def test_subtract_two_columns(self):
|
|
df = pd.DataFrame({'income': [100, 200], 'cost': [30, 80]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('subtract', minuend='income', subtrahend='cost', target_column='profit')
|
|
result = calc.calculate(df)
|
|
assert list(result['profit']) == [70, 120]
|
|
|
|
def test_subtract_constant(self):
|
|
df = pd.DataFrame({'price': [100, 200]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('subtract', minuend='price', target_column='discounted', constant=10)
|
|
result = calc.calculate(df)
|
|
assert list(result['discounted']) == [90, 190]
|
|
|
|
def test_subtract_missing_minuend(self):
|
|
df = pd.DataFrame({'a': [1, 2]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('subtract', minuend='nonexistent', target_column='x', constant=1)
|
|
result = calc.calculate(df)
|
|
assert 'x' not in result.columns
|
|
|
|
|
|
class TestFormula:
|
|
def test_basic_formula(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('formula', formula='price * quantity', target_column='total')
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['total']) == [20.0, 100.0, 300.0]
|
|
|
|
def test_invalid_formula(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('formula', formula='nonexistent + 1', target_column='x')
|
|
result = calc.calculate(sample_df)
|
|
# formula fails, original df returned
|
|
assert 'x' not in result.columns
|
|
|
|
def test_formula_missing_target(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('formula', formula='price * 2')
|
|
result = calc.calculate(sample_df)
|
|
# no target_column, nothing happens
|
|
assert list(result['price']) == [10.0, 20.0, 30.0]
|
|
|
|
|
|
class TestRound:
|
|
def test_round_specific_columns(self):
|
|
df = pd.DataFrame({'a': [1.234, 2.567], 'b': [3.1, 4.9]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('round', columns=['a'], decimals=1)
|
|
result = calc.calculate(df)
|
|
assert list(result['a']) == [1.2, 2.6]
|
|
assert list(result['b']) == [3.1, 4.9] # unchanged
|
|
|
|
def test_round_all_numeric(self):
|
|
df = pd.DataFrame({'a': [1.234, 2.567], 'b': [3.111, 4.999]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('round', decimals=0)
|
|
result = calc.calculate(df)
|
|
assert list(result['a']) == [1.0, 3.0]
|
|
assert list(result['b']) == [3.0, 5.0]
|
|
|
|
def test_round_string_column_skipped(self):
|
|
df = pd.DataFrame({'name': ['a', 'b'], 'val': [1.5, 2.5]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('round', columns=['name', 'val'], decimals=0)
|
|
result = calc.calculate(df)
|
|
assert list(result['val']) == [2.0, 2.0]
|
|
|
|
|
|
class TestSum:
|
|
def test_sum_columns_to_target(self):
|
|
df = pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('sum', columns=['a', 'b'], target_column='total')
|
|
result = calc.calculate(df)
|
|
assert list(result['total']) == [4, 6]
|
|
|
|
def test_sum_missing_columns(self):
|
|
df = pd.DataFrame({'a': [1, 2]})
|
|
calc = DataCalculator()
|
|
calc.add_rule('sum', columns=['a', 'missing'], target_column='total')
|
|
result = calc.calculate(df)
|
|
assert list(result['total']) == [1, 2]
|
|
|
|
|
|
class TestChaining:
|
|
def test_multiple_rules(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('multiply', source_column='price', target_column='total', factor=2)
|
|
calc.add_rule('add', columns=['total', 'quantity'], target_column='grand')
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['total']) == [20.0, 40.0, 60.0]
|
|
assert list(result['grand']) == [22.0, 45.0, 70.0]
|
|
|
|
def test_chaining_convenience(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.multiply('price', 'total', 2).round_columns('total', 0)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['total']) == [20.0, 40.0, 60.0]
|
|
|
|
|
|
class TestEdgeCases:
|
|
def test_empty_dataframe(self):
|
|
df = pd.DataFrame({'a': pd.Series([], dtype=float)})
|
|
calc = DataCalculator()
|
|
calc.add_rule('multiply', source_column='a', target_column='b', factor=2)
|
|
result = calc.calculate(df)
|
|
assert len(result) == 0
|
|
|
|
def test_no_rules(self, sample_df):
|
|
calc = DataCalculator()
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['price']) == [10.0, 20.0, 30.0]
|
|
|
|
def test_unknown_rule_type(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('unknown_op', source_column='price', target_column='x')
|
|
result = calc.calculate(sample_df)
|
|
# unknown rule is skipped, df unchanged
|
|
assert list(result['price']) == [10.0, 20.0, 30.0]
|
|
|
|
def test_rule_failure_continues(self, sample_df):
|
|
calc = DataCalculator()
|
|
calc.add_rule('formula', formula='nonexistent + 1', target_column='x')
|
|
calc.add_rule('multiply', source_column='price', target_column='y', factor=2)
|
|
result = calc.calculate(sample_df)
|
|
assert list(result['y']) == [20.0, 40.0, 60.0]
|