diff --git a/main.py b/main.py index 8336ae3..f0da73e 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,6 @@ import os import requests import pytz -# 配置常量 CONFIG = { "FEISHU_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # 飞书消息分割线,注意,其它类型的分割线可能会被飞书过滤而不显示 "REQUEST_INTERVAL": 1000, # 请求间隔(毫秒) @@ -30,17 +29,14 @@ class TimeHelper: @staticmethod def get_beijing_time() -> datetime: - """获取北京时间""" return datetime.now(pytz.timezone("Asia/Shanghai")) @staticmethod def format_date_folder() -> str: - """返回日期文件夹格式""" return TimeHelper.get_beijing_time().strftime("%Y年%m月%d日") @staticmethod def format_time_filename() -> str: - """返回时间文件名格式""" return TimeHelper.get_beijing_time().strftime("%H时%M分") @@ -49,12 +45,10 @@ class FileHelper: @staticmethod def ensure_directory_exists(directory: str) -> None: - """确保目录存在""" Path(directory).mkdir(parents=True, exist_ok=True) @staticmethod def get_output_path(subfolder: str, filename: str) -> str: - """获取输出文件路径""" date_folder = TimeHelper.format_date_folder() output_dir = Path("output") / date_folder / subfolder FileHelper.ensure_directory_exists(str(output_dir)) @@ -75,7 +69,6 @@ class DataFetcher: max_retry_wait: int = 5, ) -> Tuple[Optional[str], str, str]: """获取指定ID数据,支持重试""" - # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: @@ -84,7 +77,6 @@ class DataFetcher: url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" - # 设置代理 proxies = None if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} @@ -100,34 +92,32 @@ class DataFetcher: retries = 0 while retries <= max_retries: try: - print(f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})") - response = requests.get(url, proxies=proxies, headers=headers, timeout=10) + response = requests.get( + url, proxies=proxies, headers=headers, timeout=10 + ) response.raise_for_status() data_text = response.text data_json = json.loads(data_text) - # 检查响应状态,接受success和cache status = data_json.get("status", "未知") if status not in ["success", "cache"]: raise ValueError(f"响应状态异常: {status}") status_info = "最新数据" if status == "success" else "缓存数据" - print(f"成功获取 {id_value} 数据({status_info})") + print(f"获取 {id_value} 成功({status_info})") return data_text, id_value, alias except Exception as e: retries += 1 if retries <= max_retries: - # 计算重试等待时间:基础时间+递增时间 base_wait = random.uniform(min_retry_wait, max_retry_wait) additional_wait = (retries - 1) * random.uniform(1, 2) wait_time = base_wait + additional_wait - - print(f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试...") + print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...") time.sleep(wait_time) else: - print(f"请求 {id_value} 失败: {e}. 已达到最大重试次数。") + print(f"请求 {id_value} 失败: {e}") return None, id_value, alias return None, id_value, alias @@ -142,7 +132,6 @@ class DataFetcher: failed_ids = [] for i, id_info in enumerate(ids_list): - # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: @@ -150,8 +139,6 @@ class DataFetcher: alias = id_value id_to_alias[id_value] = alias - - # 获取数据 response, _, _ = self.fetch_data(id_info) if response: @@ -162,19 +149,17 @@ class DataFetcher: title = item["title"] url = item.get("url", "") mobile_url = item.get("mobileUrl", "") - + if title in results[id_value]: - # 标题已存在,更新排名 results[id_value][title]["ranks"].append(index) else: - # 新标题 results[id_value][title] = { "ranks": [index], "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } except json.JSONDecodeError: - print(f"解析 {id_value} 响应失败,非有效JSON") + print(f"解析 {id_value} 响应失败") failed_ids.append(id_value) except Exception as e: print(f"处理 {id_value} 数据出错: {e}") @@ -182,52 +167,171 @@ class DataFetcher: else: failed_ids.append(id_value) - # 添加请求间隔 if i < len(ids_list) - 1: actual_interval = request_interval + random.randint(-10, 20) - actual_interval = max(50, actual_interval) # 最少50毫秒 - print(f"等待 {actual_interval} 毫秒后发送下一个请求...") + actual_interval = max(50, actual_interval) time.sleep(actual_interval / 1000) - print(f"\n请求总结:") - print(f"- 成功获取数据: {list(results.keys())}") - print(f"- 请求失败: {failed_ids}") - + print(f"成功: {list(results.keys())}, 失败: {failed_ids}") return results, id_to_alias, failed_ids class DataProcessor: """数据处理器""" + @staticmethod + def detect_latest_new_titles(id_to_alias: Dict) -> Dict: + """检测当日最新批次的新增标题""" + date_folder = TimeHelper.format_date_folder() + txt_dir = Path("output") / date_folder / "txt" + + if not txt_dir.exists(): + return {} + + files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) + if len(files) < 2: + if len(files) == 1: + return DataProcessor._parse_file_titles(files[0]) + return {} + + latest_file = files[-1] + latest_titles = DataProcessor._parse_file_titles(latest_file) + + # 汇总历史标题 + historical_titles = {} + for file_path in files[:-1]: + historical_data = DataProcessor._parse_file_titles(file_path) + for source_name, titles_data in historical_data.items(): + if source_name not in historical_titles: + historical_titles[source_name] = set() + for title in titles_data.keys(): + historical_titles[source_name].add(title) + + # 找出新增标题 + new_titles = {} + for source_name, latest_source_titles in latest_titles.items(): + historical_set = historical_titles.get(source_name, set()) + source_new_titles = {} + + for title, title_data in latest_source_titles.items(): + if title not in historical_set: + source_new_titles[title] = title_data + + if source_new_titles: + source_id = None + for id_val, alias in id_to_alias.items(): + if alias == source_name: + source_id = id_val + break + if source_id: + new_titles[source_id] = source_new_titles + + return new_titles + + @staticmethod + def _parse_file_titles(file_path: Path) -> Dict: + """解析单个txt文件的标题数据""" + titles_by_source = {} + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + sections = content.split("\n\n") + + for section in sections: + if not section.strip() or "==== 以下ID请求失败 ====" in section: + continue + + lines = section.strip().split("\n") + if len(lines) < 2: + continue + + source_name = lines[0].strip() + titles_by_source[source_name] = {} + + for line in lines[1:]: + if line.strip(): + try: + title_part = line.strip() + rank = None + + # 提取排名 + if ( + ". " in title_part + and title_part.split(". ")[0].isdigit() + ): + rank_str, title_part = title_part.split(". ", 1) + rank = int(rank_str) + + # 提取MOBILE URL + mobile_url = "" + if " [MOBILE:" in title_part: + title_part, mobile_part = title_part.rsplit( + " [MOBILE:", 1 + ) + if mobile_part.endswith("]"): + mobile_url = mobile_part[:-1] + + # 提取URL + url = "" + if " [URL:" in title_part: + title_part, url_part = title_part.rsplit(" [URL:", 1) + if url_part.endswith("]"): + url = url_part[:-1] + + title = title_part.strip() + ranks = [rank] if rank is not None else [1] + + titles_by_source[source_name][title] = { + "ranks": ranks, + "url": url, + "mobileUrl": mobile_url, + } + + except Exception as e: + print(f"解析标题行出错: {line}, 错误: {e}") + + return titles_by_source + @staticmethod def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str: """保存标题到文件""" - file_path = FileHelper.get_output_path("txt", f"{TimeHelper.format_time_filename()}.txt") + file_path = FileHelper.get_output_path( + "txt", f"{TimeHelper.format_time_filename()}.txt" + ) with open(file_path, "w", encoding="utf-8") as f: - # 写入成功数据 for id_value, title_data in results.items(): display_name = id_to_alias.get(id_value, id_value) f.write(f"{display_name}\n") - for i, (title, info) in enumerate(title_data.items(), 1): + + # 按排名排序标题 + sorted_titles = [] + for title, info in title_data.items(): if isinstance(info, dict): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") - rank_str = ",".join(map(str, ranks)) - line = f"{i}. {title} (排名:{rank_str})" - if url: - line += f" [URL:{url}]" - if mobile_url: - line += f" [MOBILE:{mobile_url}]" - f.write(line + "\n") else: - # 兼容旧格式 - rank_str = ",".join(map(str, info)) - f.write(f"{i}. {title} (排名:{rank_str})\n") + ranks = info if isinstance(info, list) else [] + url = "" + mobile_url = "" + + rank = ranks[0] if ranks else 1 + sorted_titles.append((rank, title, url, mobile_url)) + + sorted_titles.sort(key=lambda x: x[0]) + + for rank, title, url, mobile_url in sorted_titles: + line = f"{rank}. {title}" + + if url: + line += f" [URL:{url}]" + if mobile_url: + line += f" [MOBILE:{mobile_url}]" + f.write(line + "\n") + f.write("\n") - # 写入失败信息 if failed_ids: f.write("==== 以下ID请求失败 ====\n") for id_value in failed_ids: @@ -237,7 +341,9 @@ class DataProcessor: return file_path @staticmethod - def load_frequency_words(frequency_file: str = "frequency_words.txt") -> Tuple[List[Dict], List[str]]: + def load_frequency_words( + frequency_file: str = "frequency_words.txt", + ) -> Tuple[List[Dict], List[str]]: """加载频率词配置""" frequency_path = Path(frequency_file) if not frequency_path.exists(): @@ -247,8 +353,9 @@ class DataProcessor: with open(frequency_path, "r", encoding="utf-8") as f: content = f.read() - # 按双空行分割词组 - word_groups = [group.strip() for group in content.split("\n\n") if group.strip()] + word_groups = [ + group.strip() for group in content.split("\n\n") if group.strip() + ] processed_groups = [] filter_words = [] @@ -256,10 +363,9 @@ class DataProcessor: for group in word_groups: words = [word.strip() for word in group.split("\n") if word.strip()] - # 分类词汇 - group_required_words = [] # +开头必须词 - group_normal_words = [] # 普通频率词 - group_filter_words = [] # !开头过滤词 + group_required_words = [] + group_normal_words = [] + group_filter_words = [] for word in words: if word.startswith("!"): @@ -270,19 +376,19 @@ class DataProcessor: else: group_normal_words.append(word) - # 只处理包含有效词的组 if group_required_words or group_normal_words: - # 生成组标识 if group_normal_words: group_key = " ".join(group_normal_words) else: group_key = " ".join(group_required_words) - processed_groups.append({ - 'required': group_required_words, - 'normal': group_normal_words, - 'group_key': group_key - }) + processed_groups.append( + { + "required": group_required_words, + "normal": group_normal_words, + "group_key": group_key, + } + ) return processed_groups, filter_words @@ -293,19 +399,17 @@ class DataProcessor: txt_dir = Path("output") / date_folder / "txt" if not txt_dir.exists(): - print(f"今日文件夹 {txt_dir} 不存在") return {}, {}, {} all_results = {} id_to_alias = {} title_info = {} - # 按时间排序处理文件 files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) for file_path in files: time_info = file_path.stem - + with open(file_path, "r", encoding="utf-8") as f: content = f.read() @@ -319,64 +423,63 @@ class DataProcessor: continue source_name = lines[0].strip() - - # 解析标题数据 title_data = {} + for line in lines[1:]: if line.strip(): try: - match_num = None + rank = None title_part = line.strip() - # 提取序号 - if ". " in title_part and title_part.split(". ")[0].isdigit(): + # 提取行首的排名数字 + if ( + ". " in title_part + and title_part.split(". ")[0].isdigit() + ): parts = title_part.split(". ", 1) - match_num = int(parts[0]) + rank = int(parts[0]) title_part = parts[1] - # 提取mobileUrl + # 提取 MOBILE URL mobile_url = "" if " [MOBILE:" in title_part: - title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1) + title_part, mobile_part = title_part.rsplit( + " [MOBILE:", 1 + ) if mobile_part.endswith("]"): mobile_url = mobile_part[:-1] - # 提取url + # 提取 URL url = "" if " [URL:" in title_part: - title_part, url_part = title_part.rsplit(" [URL:", 1) + title_part, url_part = title_part.rsplit( + " [URL:", 1 + ) if url_part.endswith("]"): url = url_part[:-1] - # 提取排名 - ranks = [] - if " (排名:" in title_part: - title, rank_str = title_part.rsplit(" (排名:", 1) - rank_str = rank_str.rstrip(")") - ranks = [int(r) for r in rank_str.split(",") if r.strip() and r.isdigit()] - else: - title = title_part - - if not ranks and match_num is not None: - ranks = [match_num] - if not ranks: - ranks = [99] + title = title_part.strip() + ranks = [rank] if rank is not None else [1] title_data[title] = { "ranks": ranks, "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } except Exception as e: print(f"解析标题行出错: {line}, 错误: {e}") DataProcessor._process_source_data( - source_name, title_data, time_info, - all_results, title_info, id_to_alias + source_name, + title_data, + time_info, + all_results, + title_info, + id_to_alias, ) - # 转换为ID结果 + # 转换为ID格式 id_results = {} id_title_info = {} for name, titles in all_results.items(): @@ -390,27 +493,24 @@ class DataProcessor: @staticmethod def _process_source_data( - source_name: str, title_data: Dict, time_info: str, - all_results: Dict, title_info: Dict, id_to_alias: Dict, + source_name: str, + title_data: Dict, + time_info: str, + all_results: Dict, + title_info: Dict, + id_to_alias: Dict, ) -> None: """处理来源数据,合并重复标题""" if source_name not in all_results: - # 首次遇到此来源 all_results[source_name] = title_data if source_name not in title_info: title_info[source_name] = {} - # 记录标题信息 for title, data in title_data.items(): - if isinstance(data, dict): - ranks = data.get("ranks", []) - url = data.get("url", "") - mobile_url = data.get("mobileUrl", "") - else: - ranks = data if isinstance(data, list) else [] - url = "" - mobile_url = "" + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") title_info[source_name][title] = { "first_time": time_info, @@ -421,27 +521,19 @@ class DataProcessor: "mobileUrl": mobile_url, } - # 生成反向ID映射 reversed_id = source_name.lower().replace(" ", "-") id_to_alias[reversed_id] = source_name else: - # 更新已有来源 for title, data in title_data.items(): - if isinstance(data, dict): - ranks = data.get("ranks", []) - url = data.get("url", "") - mobile_url = data.get("mobileUrl", "") - else: - ranks = data if isinstance(data, list) else [] - url = "" - mobile_url = "" + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") if title not in all_results[source_name]: - # 新标题 all_results[source_name][title] = { "ranks": ranks, "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } title_info[source_name][title] = { "first_time": time_info, @@ -452,12 +544,11 @@ class DataProcessor: "mobileUrl": mobile_url, } else: - # 更新已有标题 existing_data = all_results[source_name][title] existing_ranks = existing_data.get("ranks", []) existing_url = existing_data.get("url", "") existing_mobile_url = existing_data.get("mobileUrl", "") - + merged_ranks = existing_ranks.copy() for rank in ranks: if rank not in merged_ranks: @@ -466,13 +557,12 @@ class DataProcessor: all_results[source_name][title] = { "ranks": merged_ranks, "url": existing_url or url, - "mobileUrl": existing_mobile_url or mobile_url + "mobileUrl": existing_mobile_url or mobile_url, } title_info[source_name][title]["last_time"] = time_info title_info[source_name][title]["ranks"] = merged_ranks title_info[source_name][title]["count"] += 1 - # 保留第一个有效URL if not title_info[source_name][title].get("url"): title_info[source_name][title]["url"] = url if not title_info[source_name][title].get("mobileUrl"): @@ -482,6 +572,42 @@ class DataProcessor: class StatisticsCalculator: """统计计算器""" + @staticmethod + def _matches_word_groups( + title: str, word_groups: List[Dict], filter_words: List[str] + ) -> bool: + """检查标题是否匹配词组规则""" + title_lower = title.lower() + + # 过滤词检查 + if any(filter_word.lower() in title_lower for filter_word in filter_words): + return False + + # 词组匹配检查 + for group in word_groups: + required_words = group["required"] + normal_words = group["normal"] + + # 必须词检查 + if required_words: + all_required_present = all( + req_word.lower() in title_lower for req_word in required_words + ) + if not all_required_present: + continue + + # 普通词检查 + if normal_words: + any_normal_present = any( + normal_word.lower() in title_lower for normal_word in normal_words + ) + if not any_normal_present: + continue + + return True + + return False + @staticmethod def count_word_frequency( results: Dict, @@ -490,21 +616,22 @@ class StatisticsCalculator: id_to_alias: Dict, title_info: Optional[Dict] = None, rank_threshold: int = CONFIG["RANK_THRESHOLD"], + new_titles: Optional[Dict] = None, ) -> Tuple[List[Dict], int]: - """统计词频,支持必须词、频率词、过滤词""" + """统计词频,支持必须词、频率词、过滤词,并标记新增标题""" word_stats = {} total_titles = 0 - processed_titles = {} # 跟踪已处理标题 + processed_titles = {} if title_info is None: title_info = {} + if new_titles is None: + new_titles = {} - # 初始化统计对象 for group in word_groups: - group_key = group['group_key'] + group_key = group["group_key"] word_stats[group_key] = {"count": 0, "titles": {}} - # 遍历标题进行统计 for source_id, titles_data in results.items(): total_titles += len(titles_data) @@ -515,57 +642,44 @@ class StatisticsCalculator: if title in processed_titles.get(source_id, {}): continue - title_lower = title.lower() - - # 优先级1:过滤词检查 - contains_filter_word = any( - filter_word.lower() in title_lower for filter_word in filter_words - ) - if contains_filter_word: + # 使用统一的匹配逻辑 + if not StatisticsCalculator._matches_word_groups( + title, word_groups, filter_words + ): continue - # 兼容数据格式 - if isinstance(title_data, dict): - source_ranks = title_data.get("ranks", []) - source_url = title_data.get("url", "") - source_mobile_url = title_data.get("mobileUrl", "") - else: - source_ranks = title_data if isinstance(title_data, list) else [] - source_url = "" - source_mobile_url = "" + source_ranks = title_data.get("ranks", []) + source_url = title_data.get("url", "") + source_mobile_url = title_data.get("mobileUrl", "") - # 检查每个词组 + # 找到匹配的词组 + title_lower = title.lower() for group in word_groups: - group_key = group['group_key'] - required_words = group['required'] - normal_words = group['normal'] + required_words = group["required"] + normal_words = group["normal"] - # 优先级2:必须词检查 + # 再次检查匹配 if required_words: all_required_present = all( - req_word.lower() in title_lower for req_word in required_words + req_word.lower() in title_lower + for req_word in required_words ) if not all_required_present: continue - # 优先级3:频率词检查 if normal_words: any_normal_present = any( - normal_word.lower() in title_lower for normal_word in normal_words + normal_word.lower() in title_lower + for normal_word in normal_words ) if not any_normal_present: continue - # 如果只有必须词没有频率词,且所有必须词都匹配了,那么也算匹配 - # 如果既有必须词又有频率词,那么必须词全部匹配且至少一个频率词匹配 - # 如果只有频率词,那么至少一个频率词匹配 - - # 匹配成功,记录数据 + group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] - # 获取标题详细信息 first_time = "" last_time = "" count_info = 1 @@ -573,7 +687,11 @@ class StatisticsCalculator: url = source_url mobile_url = source_mobile_url - if (title_info and source_id in title_info and title in title_info[source_id]): + if ( + title_info + and source_id in title_info + and title in title_info[source_id] + ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") @@ -586,51 +704,61 @@ class StatisticsCalculator: if not ranks: ranks = [99] - time_display = StatisticsCalculator._format_time_display(first_time, last_time) + time_display = StatisticsCalculator._format_time_display( + first_time, last_time + ) source_alias = id_to_alias.get(source_id, source_id) - word_stats[group_key]["titles"][source_id].append({ - "title": title, - "source_alias": source_alias, - "first_time": first_time, - "last_time": last_time, - "time_display": time_display, - "count": count_info, - "ranks": ranks, - "rank_threshold": rank_threshold, - "url": url, - "mobileUrl": mobile_url, - }) + is_new = source_id in new_titles and title in new_titles[source_id] + + word_stats[group_key]["titles"][source_id].append( + { + "title": title, + "source_alias": source_alias, + "first_time": first_time, + "last_time": last_time, + "time_display": time_display, + "count": count_info, + "ranks": ranks, + "rank_threshold": rank_threshold, + "url": url, + "mobileUrl": mobile_url, + "is_new": is_new, + } + ) - # 标记已处理 if source_id not in processed_titles: processed_titles[source_id] = {} processed_titles[source_id][title] = True - break # 只匹配第一个词组 + break - # 转换统计结果 stats = [] for group_key, data in word_stats.items(): all_titles = [] for source_id, title_list in data["titles"].items(): all_titles.extend(title_list) - stats.append({ - "word": group_key, - "count": data["count"], - "titles": all_titles, - "percentage": ( - round(data["count"] / total_titles * 100, 2) - if total_titles > 0 else 0 - ), - }) + stats.append( + { + "word": group_key, + "count": data["count"], + "titles": all_titles, + "percentage": ( + round(data["count"] / total_titles * 100, 2) + if total_titles > 0 + else 0 + ), + } + ) stats.sort(key=lambda x: x["count"], reverse=True) return stats, total_titles @staticmethod - def _format_rank_for_html(ranks: List[int], rank_threshold: int = 5) -> str: - """格式化HTML排名显示""" + def _format_rank_base( + ranks: List[int], rank_threshold: int = 5, format_type: str = "html" + ) -> str: + """基础排名格式化方法""" if not ranks: return "" @@ -638,44 +766,41 @@ class StatisticsCalculator: min_rank = unique_ranks[0] max_rank = unique_ranks[-1] + # 根据格式类型选择不同的标记方式 + if format_type == "html": + highlight_start = "" + highlight_end = "" + else: # feishu + highlight_start = "**" + highlight_end = "**" + + # 格式化排名显示 if min_rank <= rank_threshold: if min_rank == max_rank: - return f"[{min_rank}]" + return f"{highlight_start}[{min_rank}]{highlight_end}" else: - return f"[{min_rank} - {max_rank}]" + return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}" else: if min_rank == max_rank: return f"[{min_rank}]" else: return f"[{min_rank} - {max_rank}]" + @staticmethod + def _format_rank_for_html(ranks: List[int], rank_threshold: int = 5) -> str: + """格式化HTML排名显示""" + return StatisticsCalculator._format_rank_base(ranks, rank_threshold, "html") + @staticmethod def _format_rank_for_feishu(ranks: List[int], rank_threshold: int = 5) -> str: """格式化飞书排名显示""" - if not ranks: - return "" - - unique_ranks = sorted(set(ranks)) - min_rank = unique_ranks[0] - max_rank = unique_ranks[-1] - - if min_rank <= rank_threshold: - if min_rank == max_rank: - return f"**[{min_rank}]**" - else: - return f"**[{min_rank} - {max_rank}]**" - else: - if min_rank == max_rank: - return f"[{min_rank}]" - else: - return f"[{min_rank} - {max_rank}]" + return StatisticsCalculator._format_rank_base(ranks, rank_threshold, "feishu") @staticmethod def _format_time_display(first_time: str, last_time: str) -> str: """格式化时间显示""" if not first_time: return "" - if first_time == last_time or not last_time: return first_time else: @@ -691,6 +816,8 @@ class ReportGenerator: total_titles: int, failed_ids: Optional[List] = None, is_daily: bool = False, + new_titles: Optional[Dict] = None, + id_to_alias: Optional[Dict] = None, ) -> str: """生成HTML报告""" if is_daily: @@ -700,30 +827,190 @@ class ReportGenerator: file_path = FileHelper.get_output_path("html", filename) - html_content = ReportGenerator._create_html_content( - stats, total_titles, failed_ids, is_daily + # 数据处理层 + report_data = ReportGenerator._prepare_report_data( + stats, failed_ids, new_titles, id_to_alias + ) + + # 渲染层 + html_content = ReportGenerator._render_html_content( + report_data, total_titles, is_daily ) with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) - # 当日统计同时生成根目录index.html if is_daily: root_file_path = Path("index.html") with open(root_file_path, "w", encoding="utf-8") as f: f.write(html_content) - print(f"当日统计报告已保存到根目录: {root_file_path.resolve()}") return file_path @staticmethod - def _create_html_content( + def _prepare_report_data( stats: List[Dict], - total_titles: int, failed_ids: Optional[List] = None, - is_daily: bool = False, + new_titles: Optional[Dict] = None, + id_to_alias: Optional[Dict] = None, + ) -> Dict: + """准备报告数据""" + filtered_new_titles = {} + if new_titles and id_to_alias: + word_groups, filter_words = DataProcessor.load_frequency_words() + for source_id, titles_data in new_titles.items(): + filtered_titles = ReportGenerator._apply_frequency_filter( + titles_data, word_groups, filter_words + ) + if filtered_titles: + filtered_new_titles[source_id] = filtered_titles + + processed_stats = [] + for stat in stats: + if stat["count"] <= 0: + continue + + processed_titles = [] + for title_data in stat["titles"]: + processed_title = { + "title": title_data["title"], + "source_alias": title_data["source_alias"], + "time_display": title_data["time_display"], + "count": title_data["count"], + "ranks": title_data["ranks"], + "rank_threshold": title_data["rank_threshold"], + "url": title_data.get("url", ""), + "mobile_url": title_data.get("mobileUrl", ""), + "is_new": title_data.get("is_new", False), + } + processed_titles.append(processed_title) + + processed_stats.append( + { + "word": stat["word"], + "count": stat["count"], + "percentage": stat.get("percentage", 0), + "titles": processed_titles, + } + ) + + processed_new_titles = [] + if filtered_new_titles and id_to_alias: + for source_id, titles_data in filtered_new_titles.items(): + source_alias = id_to_alias.get(source_id, source_id) + source_titles = [] + + for title, title_data in titles_data.items(): + url, mobile_url, ranks = ReportGenerator._extract_title_data_fields( + title_data + ) + + processed_title = { + "title": title, + "source_alias": source_alias, + "time_display": "", + "count": 1, + "ranks": ranks, + "rank_threshold": CONFIG["RANK_THRESHOLD"], + "url": url, + "mobile_url": mobile_url, + "is_new": True, + } + source_titles.append(processed_title) + + if source_titles: + processed_new_titles.append( + { + "source_id": source_id, + "source_alias": source_alias, + "titles": source_titles, + } + ) + + return { + "stats": processed_stats, + "new_titles": processed_new_titles, + "failed_ids": failed_ids or [], + "total_new_count": sum( + len(source["titles"]) for source in processed_new_titles + ), + } + + @staticmethod + def _extract_title_data_fields(title_data) -> Tuple[str, str, List[int]]: + """提取标题数据的通用字段""" + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") + ranks = title_data.get("ranks", []) + + return url, mobile_url, ranks + + @staticmethod + def _apply_frequency_filter( + titles_data: Dict, word_groups: List[Dict], filter_words: List[str] + ) -> Dict: + """应用频率词过滤逻辑""" + filtered_titles = {} + + for title, title_data in titles_data.items(): + if StatisticsCalculator._matches_word_groups( + title, word_groups, filter_words + ): + filtered_titles[title] = title_data + + return filtered_titles + + @staticmethod + def _html_escape(text: str) -> str: + """HTML转义""" + if not isinstance(text, str): + text = str(text) + + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + + @staticmethod + def _format_title_html(title_data: Dict) -> str: + """格式化HTML标题显示""" + rank_display = StatisticsCalculator._format_rank_for_html( + title_data["ranks"], title_data["rank_threshold"] + ) + + link_url = title_data["mobile_url"] or title_data["url"] + escaped_title = ReportGenerator._html_escape(title_data["title"]) + escaped_source_alias = ReportGenerator._html_escape(title_data["source_alias"]) + + if link_url: + escaped_url = ReportGenerator._html_escape(link_url) + formatted_title = f'[{escaped_source_alias}] {escaped_title}' + else: + formatted_title = ( + f'[{escaped_source_alias}] {escaped_title}' + ) + + if rank_display: + formatted_title += f" {rank_display}" + if title_data["time_display"]: + escaped_time = ReportGenerator._html_escape(title_data["time_display"]) + formatted_title += f" - {escaped_time}" + if title_data["count"] > 1: + formatted_title += f" ({title_data['count']}次)" + + if title_data["is_new"]: + formatted_title = f"
总标题数: {total_titles}
" html += f"生成时间: {now.strftime('%Y-%m-%d %H:%M:%S')}
" - # 失败信息 - if failed_ids and len(failed_ids) > 0: + # 渲染失败平台 + if report_data["failed_ids"]: html += """| {i} | {escaped_word} | {stat['count']} | -{stat['percentage']}% | +{stat.get('percentage', 0)}% | {" ".join(formatted_titles)} |