# coding=utf-8 """ 应用上下文模块 提供配置上下文类,封装所有依赖配置的操作,消除全局状态和包装函数。 """ from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple from trendradar.utils.time import ( get_configured_time, format_date_folder, format_time_filename, get_current_time_display, convert_time_for_display, ) from trendradar.core import ( load_frequency_words, matches_word_groups, save_titles_to_file, read_all_today_titles, detect_latest_new_titles, is_first_crawl_today, count_word_frequency, ) from trendradar.report import ( clean_title, prepare_report_data, generate_html_report, render_html_content, ) from trendradar.notification import ( render_feishu_content, render_dingtalk_content, split_content_into_batches, NotificationDispatcher, PushRecordManager, ) from trendradar.storage import get_storage_manager class AppContext: """ 应用上下文类 封装所有依赖配置的操作,提供统一的接口。 消除对全局 CONFIG 的依赖,提高可测试性。 使用示例: config = load_config() ctx = AppContext(config) # 时间操作 now = ctx.get_time() date_folder = ctx.format_date() # 存储操作 storage = ctx.get_storage_manager() # 报告生成 html = ctx.generate_html_report(stats, total_titles, ...) """ def __init__(self, config: Dict[str, Any]): """ 初始化应用上下文 Args: config: 完整的配置字典 """ self.config = config self._storage_manager = None # === 配置访问 === @property def timezone(self) -> str: """获取配置的时区""" return self.config.get("TIMEZONE", "Asia/Shanghai") @property def rank_threshold(self) -> int: """获取排名阈值""" return self.config.get("RANK_THRESHOLD", 50) @property def weight_config(self) -> Dict: """获取权重配置""" return self.config.get("WEIGHT_CONFIG", {}) @property def platforms(self) -> List[Dict]: """获取平台配置列表""" return self.config.get("PLATFORMS", []) @property def platform_ids(self) -> List[str]: """获取平台ID列表""" return [p["id"] for p in self.platforms] # === 时间操作 === def get_time(self) -> datetime: """获取当前配置时区的时间""" return get_configured_time(self.timezone) def format_date(self) -> str: """格式化日期文件夹 (YYYY-MM-DD)""" return format_date_folder(timezone=self.timezone) def format_time(self) -> str: """格式化时间文件名 (HH-MM)""" return format_time_filename(self.timezone) def get_time_display(self) -> str: """获取时间显示 (HH:MM)""" return get_current_time_display(self.timezone) @staticmethod def convert_time_display(time_str: str) -> str: """将 HH-MM 转换为 HH:MM""" return convert_time_for_display(time_str) # === 存储操作 === def get_storage_manager(self): """获取存储管理器(延迟初始化,单例)""" if self._storage_manager is None: storage_config = self.config.get("STORAGE", {}) remote_config = storage_config.get("REMOTE", {}) local_config = storage_config.get("LOCAL", {}) pull_config = storage_config.get("PULL", {}) self._storage_manager = get_storage_manager( backend_type=storage_config.get("BACKEND", "auto"), data_dir=local_config.get("DATA_DIR", "output"), enable_txt=storage_config.get("FORMATS", {}).get("TXT", True), enable_html=storage_config.get("FORMATS", {}).get("HTML", True), remote_config={ "bucket_name": remote_config.get("BUCKET_NAME", ""), "access_key_id": remote_config.get("ACCESS_KEY_ID", ""), "secret_access_key": remote_config.get("SECRET_ACCESS_KEY", ""), "endpoint_url": remote_config.get("ENDPOINT_URL", ""), "region": remote_config.get("REGION", ""), }, local_retention_days=local_config.get("RETENTION_DAYS", 0), remote_retention_days=remote_config.get("RETENTION_DAYS", 0), pull_enabled=pull_config.get("ENABLED", False), pull_days=pull_config.get("DAYS", 7), timezone=self.timezone, ) return self._storage_manager def get_output_path(self, subfolder: str, filename: str) -> str: """获取输出路径""" output_dir = Path("output") / self.format_date() / subfolder output_dir.mkdir(parents=True, exist_ok=True) return str(output_dir / filename) # === 数据处理 === def save_titles(self, results: Dict, id_to_name: Dict, failed_ids: List) -> str: """保存标题到文件""" output_path = self.get_output_path("txt", f"{self.format_time()}.txt") return save_titles_to_file(results, id_to_name, failed_ids, output_path, clean_title) def read_today_titles( self, platform_ids: Optional[List[str]] = None ) -> Tuple[Dict, Dict, Dict]: """读取当天所有标题""" return read_all_today_titles(self.get_storage_manager(), platform_ids) def detect_new_titles( self, platform_ids: Optional[List[str]] = None ) -> Dict: """检测最新批次的新增标题""" return detect_latest_new_titles(self.get_storage_manager(), platform_ids) def is_first_crawl(self) -> bool: """检测是否是当天第一次爬取""" return is_first_crawl_today("output", self.format_date()) # === 频率词处理 === def load_frequency_words( self, frequency_file: Optional[str] = None ) -> Tuple[List[Dict], List[str], List[str]]: """加载频率词配置""" return load_frequency_words(frequency_file) def matches_word_groups( self, title: str, word_groups: List[Dict], filter_words: List[str], global_filters: Optional[List[str]] = None, ) -> bool: """检查标题是否匹配词组规则""" return matches_word_groups(title, word_groups, filter_words, global_filters) # === 统计分析 === def count_frequency( self, results: Dict, word_groups: List[Dict], filter_words: List[str], id_to_name: Dict, title_info: Optional[Dict] = None, new_titles: Optional[Dict] = None, mode: str = "daily", global_filters: Optional[List[str]] = None, ) -> Tuple[List[Dict], int]: """统计词频""" return count_word_frequency( results=results, word_groups=word_groups, filter_words=filter_words, id_to_name=id_to_name, title_info=title_info, rank_threshold=self.rank_threshold, new_titles=new_titles, mode=mode, global_filters=global_filters, weight_config=self.weight_config, max_news_per_keyword=self.config.get("MAX_NEWS_PER_KEYWORD", 0), sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False), is_first_crawl_func=self.is_first_crawl, convert_time_func=self.convert_time_display, ) # === 报告生成 === def prepare_report( self, stats: List[Dict], failed_ids: Optional[List] = None, new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, mode: str = "daily", ) -> Dict: """准备报告数据""" return prepare_report_data( stats=stats, failed_ids=failed_ids, new_titles=new_titles, id_to_name=id_to_name, mode=mode, rank_threshold=self.rank_threshold, matches_word_groups_func=self.matches_word_groups, load_frequency_words_func=self.load_frequency_words, ) def generate_html( self, stats: List[Dict], total_titles: int, failed_ids: Optional[List] = None, new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, mode: str = "daily", is_daily_summary: bool = False, update_info: Optional[Dict] = None, ) -> str: """生成HTML报告""" return generate_html_report( stats=stats, total_titles=total_titles, failed_ids=failed_ids, new_titles=new_titles, id_to_name=id_to_name, mode=mode, is_daily_summary=is_daily_summary, update_info=update_info, rank_threshold=self.rank_threshold, output_dir="output", date_folder=self.format_date(), time_filename=self.format_time(), render_html_func=lambda *args, **kwargs: self.render_html(*args, **kwargs), matches_word_groups_func=self.matches_word_groups, load_frequency_words_func=self.load_frequency_words, enable_index_copy=True, ) def render_html( self, report_data: Dict, total_titles: int, is_daily_summary: bool = False, mode: str = "daily", update_info: Optional[Dict] = None, ) -> str: """渲染HTML内容""" return render_html_content( report_data=report_data, total_titles=total_titles, is_daily_summary=is_daily_summary, mode=mode, update_info=update_info, reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False), get_time_func=self.get_time, ) # === 通知内容渲染 === def render_feishu( self, report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily", ) -> str: """渲染飞书内容""" return render_feishu_content( report_data=report_data, update_info=update_info, mode=mode, separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"), reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False), get_time_func=self.get_time, ) def render_dingtalk( self, report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily", ) -> str: """渲染钉钉内容""" return render_dingtalk_content( report_data=report_data, update_info=update_info, mode=mode, reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False), get_time_func=self.get_time, ) def split_content( self, report_data: Dict, format_type: str, update_info: Optional[Dict] = None, max_bytes: Optional[int] = None, mode: str = "daily", ) -> List[str]: """分批处理消息内容""" return split_content_into_batches( report_data=report_data, format_type=format_type, update_info=update_info, max_bytes=max_bytes, mode=mode, batch_sizes={ "dingtalk": self.config.get("DINGTALK_BATCH_SIZE", 20000), "feishu": self.config.get("FEISHU_BATCH_SIZE", 29000), "default": self.config.get("MESSAGE_BATCH_SIZE", 4000), }, feishu_separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"), reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False), get_time_func=self.get_time, ) # === 通知发送 === def create_notification_dispatcher(self) -> NotificationDispatcher: """创建通知调度器""" return NotificationDispatcher( config=self.config, get_time_func=self.get_time, split_content_func=self.split_content, ) def create_push_manager(self) -> PushRecordManager: """创建推送记录管理器""" return PushRecordManager( storage_backend=self.get_storage_manager(), get_time_func=self.get_time, ) # === 资源清理 === def cleanup(self): """清理资源""" if self._storage_manager: self._storage_manager.cleanup_old_data() self._storage_manager.cleanup() self._storage_manager = None