From 0fb6d85bf3818ff778437f193c7846e7aaf9181d Mon Sep 17 00:00:00 2001 From: sansan <77180927+sansan0@users.noreply.github.com> Date: Fri, 13 Jun 2025 19:22:03 +0800 Subject: [PATCH] Update main.py --- main.py | 1089 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 719 insertions(+), 370 deletions(-) diff --git a/main.py b/main.py index 8336ae3..f0da73e 100644 --- a/main.py +++ b/main.py @@ -12,7 +12,6 @@ import os import requests import pytz -# 配置常量 CONFIG = { "FEISHU_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # 飞书消息分割线,注意,其它类型的分割线可能会被飞书过滤而不显示 "REQUEST_INTERVAL": 1000, # 请求间隔(毫秒) @@ -30,17 +29,14 @@ class TimeHelper: @staticmethod def get_beijing_time() -> datetime: - """获取北京时间""" return datetime.now(pytz.timezone("Asia/Shanghai")) @staticmethod def format_date_folder() -> str: - """返回日期文件夹格式""" return TimeHelper.get_beijing_time().strftime("%Y年%m月%d日") @staticmethod def format_time_filename() -> str: - """返回时间文件名格式""" return TimeHelper.get_beijing_time().strftime("%H时%M分") @@ -49,12 +45,10 @@ class FileHelper: @staticmethod def ensure_directory_exists(directory: str) -> None: - """确保目录存在""" Path(directory).mkdir(parents=True, exist_ok=True) @staticmethod def get_output_path(subfolder: str, filename: str) -> str: - """获取输出文件路径""" date_folder = TimeHelper.format_date_folder() output_dir = Path("output") / date_folder / subfolder FileHelper.ensure_directory_exists(str(output_dir)) @@ -75,7 +69,6 @@ class DataFetcher: max_retry_wait: int = 5, ) -> Tuple[Optional[str], str, str]: """获取指定ID数据,支持重试""" - # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: @@ -84,7 +77,6 @@ class DataFetcher: url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" - # 设置代理 proxies = None if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} @@ -100,34 +92,32 @@ class DataFetcher: retries = 0 while retries <= max_retries: try: - print(f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})") - response = requests.get(url, proxies=proxies, headers=headers, timeout=10) + response = requests.get( + url, proxies=proxies, headers=headers, timeout=10 + ) response.raise_for_status() data_text = response.text data_json = json.loads(data_text) - # 检查响应状态,接受success和cache status = data_json.get("status", "未知") if status not in ["success", "cache"]: raise ValueError(f"响应状态异常: {status}") status_info = "最新数据" if status == "success" else "缓存数据" - print(f"成功获取 {id_value} 数据({status_info})") + print(f"获取 {id_value} 成功({status_info})") return data_text, id_value, alias except Exception as e: retries += 1 if retries <= max_retries: - # 计算重试等待时间:基础时间+递增时间 base_wait = random.uniform(min_retry_wait, max_retry_wait) additional_wait = (retries - 1) * random.uniform(1, 2) wait_time = base_wait + additional_wait - - print(f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试...") + print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...") time.sleep(wait_time) else: - print(f"请求 {id_value} 失败: {e}. 已达到最大重试次数。") + print(f"请求 {id_value} 失败: {e}") return None, id_value, alias return None, id_value, alias @@ -142,7 +132,6 @@ class DataFetcher: failed_ids = [] for i, id_info in enumerate(ids_list): - # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: @@ -150,8 +139,6 @@ class DataFetcher: alias = id_value id_to_alias[id_value] = alias - - # 获取数据 response, _, _ = self.fetch_data(id_info) if response: @@ -162,19 +149,17 @@ class DataFetcher: title = item["title"] url = item.get("url", "") mobile_url = item.get("mobileUrl", "") - + if title in results[id_value]: - # 标题已存在,更新排名 results[id_value][title]["ranks"].append(index) else: - # 新标题 results[id_value][title] = { "ranks": [index], "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } except json.JSONDecodeError: - print(f"解析 {id_value} 响应失败,非有效JSON") + print(f"解析 {id_value} 响应失败") failed_ids.append(id_value) except Exception as e: print(f"处理 {id_value} 数据出错: {e}") @@ -182,52 +167,171 @@ class DataFetcher: else: failed_ids.append(id_value) - # 添加请求间隔 if i < len(ids_list) - 1: actual_interval = request_interval + random.randint(-10, 20) - actual_interval = max(50, actual_interval) # 最少50毫秒 - print(f"等待 {actual_interval} 毫秒后发送下一个请求...") + actual_interval = max(50, actual_interval) time.sleep(actual_interval / 1000) - print(f"\n请求总结:") - print(f"- 成功获取数据: {list(results.keys())}") - print(f"- 请求失败: {failed_ids}") - + print(f"成功: {list(results.keys())}, 失败: {failed_ids}") return results, id_to_alias, failed_ids class DataProcessor: """数据处理器""" + @staticmethod + def detect_latest_new_titles(id_to_alias: Dict) -> Dict: + """检测当日最新批次的新增标题""" + date_folder = TimeHelper.format_date_folder() + txt_dir = Path("output") / date_folder / "txt" + + if not txt_dir.exists(): + return {} + + files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) + if len(files) < 2: + if len(files) == 1: + return DataProcessor._parse_file_titles(files[0]) + return {} + + latest_file = files[-1] + latest_titles = DataProcessor._parse_file_titles(latest_file) + + # 汇总历史标题 + historical_titles = {} + for file_path in files[:-1]: + historical_data = DataProcessor._parse_file_titles(file_path) + for source_name, titles_data in historical_data.items(): + if source_name not in historical_titles: + historical_titles[source_name] = set() + for title in titles_data.keys(): + historical_titles[source_name].add(title) + + # 找出新增标题 + new_titles = {} + for source_name, latest_source_titles in latest_titles.items(): + historical_set = historical_titles.get(source_name, set()) + source_new_titles = {} + + for title, title_data in latest_source_titles.items(): + if title not in historical_set: + source_new_titles[title] = title_data + + if source_new_titles: + source_id = None + for id_val, alias in id_to_alias.items(): + if alias == source_name: + source_id = id_val + break + if source_id: + new_titles[source_id] = source_new_titles + + return new_titles + + @staticmethod + def _parse_file_titles(file_path: Path) -> Dict: + """解析单个txt文件的标题数据""" + titles_by_source = {} + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + sections = content.split("\n\n") + + for section in sections: + if not section.strip() or "==== 以下ID请求失败 ====" in section: + continue + + lines = section.strip().split("\n") + if len(lines) < 2: + continue + + source_name = lines[0].strip() + titles_by_source[source_name] = {} + + for line in lines[1:]: + if line.strip(): + try: + title_part = line.strip() + rank = None + + # 提取排名 + if ( + ". " in title_part + and title_part.split(". ")[0].isdigit() + ): + rank_str, title_part = title_part.split(". ", 1) + rank = int(rank_str) + + # 提取MOBILE URL + mobile_url = "" + if " [MOBILE:" in title_part: + title_part, mobile_part = title_part.rsplit( + " [MOBILE:", 1 + ) + if mobile_part.endswith("]"): + mobile_url = mobile_part[:-1] + + # 提取URL + url = "" + if " [URL:" in title_part: + title_part, url_part = title_part.rsplit(" [URL:", 1) + if url_part.endswith("]"): + url = url_part[:-1] + + title = title_part.strip() + ranks = [rank] if rank is not None else [1] + + titles_by_source[source_name][title] = { + "ranks": ranks, + "url": url, + "mobileUrl": mobile_url, + } + + except Exception as e: + print(f"解析标题行出错: {line}, 错误: {e}") + + return titles_by_source + @staticmethod def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str: """保存标题到文件""" - file_path = FileHelper.get_output_path("txt", f"{TimeHelper.format_time_filename()}.txt") + file_path = FileHelper.get_output_path( + "txt", f"{TimeHelper.format_time_filename()}.txt" + ) with open(file_path, "w", encoding="utf-8") as f: - # 写入成功数据 for id_value, title_data in results.items(): display_name = id_to_alias.get(id_value, id_value) f.write(f"{display_name}\n") - for i, (title, info) in enumerate(title_data.items(), 1): + + # 按排名排序标题 + sorted_titles = [] + for title, info in title_data.items(): if isinstance(info, dict): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") - rank_str = ",".join(map(str, ranks)) - line = f"{i}. {title} (排名:{rank_str})" - if url: - line += f" [URL:{url}]" - if mobile_url: - line += f" [MOBILE:{mobile_url}]" - f.write(line + "\n") else: - # 兼容旧格式 - rank_str = ",".join(map(str, info)) - f.write(f"{i}. {title} (排名:{rank_str})\n") + ranks = info if isinstance(info, list) else [] + url = "" + mobile_url = "" + + rank = ranks[0] if ranks else 1 + sorted_titles.append((rank, title, url, mobile_url)) + + sorted_titles.sort(key=lambda x: x[0]) + + for rank, title, url, mobile_url in sorted_titles: + line = f"{rank}. {title}" + + if url: + line += f" [URL:{url}]" + if mobile_url: + line += f" [MOBILE:{mobile_url}]" + f.write(line + "\n") + f.write("\n") - # 写入失败信息 if failed_ids: f.write("==== 以下ID请求失败 ====\n") for id_value in failed_ids: @@ -237,7 +341,9 @@ class DataProcessor: return file_path @staticmethod - def load_frequency_words(frequency_file: str = "frequency_words.txt") -> Tuple[List[Dict], List[str]]: + def load_frequency_words( + frequency_file: str = "frequency_words.txt", + ) -> Tuple[List[Dict], List[str]]: """加载频率词配置""" frequency_path = Path(frequency_file) if not frequency_path.exists(): @@ -247,8 +353,9 @@ class DataProcessor: with open(frequency_path, "r", encoding="utf-8") as f: content = f.read() - # 按双空行分割词组 - word_groups = [group.strip() for group in content.split("\n\n") if group.strip()] + word_groups = [ + group.strip() for group in content.split("\n\n") if group.strip() + ] processed_groups = [] filter_words = [] @@ -256,10 +363,9 @@ class DataProcessor: for group in word_groups: words = [word.strip() for word in group.split("\n") if word.strip()] - # 分类词汇 - group_required_words = [] # +开头必须词 - group_normal_words = [] # 普通频率词 - group_filter_words = [] # !开头过滤词 + group_required_words = [] + group_normal_words = [] + group_filter_words = [] for word in words: if word.startswith("!"): @@ -270,19 +376,19 @@ class DataProcessor: else: group_normal_words.append(word) - # 只处理包含有效词的组 if group_required_words or group_normal_words: - # 生成组标识 if group_normal_words: group_key = " ".join(group_normal_words) else: group_key = " ".join(group_required_words) - processed_groups.append({ - 'required': group_required_words, - 'normal': group_normal_words, - 'group_key': group_key - }) + processed_groups.append( + { + "required": group_required_words, + "normal": group_normal_words, + "group_key": group_key, + } + ) return processed_groups, filter_words @@ -293,19 +399,17 @@ class DataProcessor: txt_dir = Path("output") / date_folder / "txt" if not txt_dir.exists(): - print(f"今日文件夹 {txt_dir} 不存在") return {}, {}, {} all_results = {} id_to_alias = {} title_info = {} - # 按时间排序处理文件 files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) for file_path in files: time_info = file_path.stem - + with open(file_path, "r", encoding="utf-8") as f: content = f.read() @@ -319,64 +423,63 @@ class DataProcessor: continue source_name = lines[0].strip() - - # 解析标题数据 title_data = {} + for line in lines[1:]: if line.strip(): try: - match_num = None + rank = None title_part = line.strip() - # 提取序号 - if ". " in title_part and title_part.split(". ")[0].isdigit(): + # 提取行首的排名数字 + if ( + ". " in title_part + and title_part.split(". ")[0].isdigit() + ): parts = title_part.split(". ", 1) - match_num = int(parts[0]) + rank = int(parts[0]) title_part = parts[1] - # 提取mobileUrl + # 提取 MOBILE URL mobile_url = "" if " [MOBILE:" in title_part: - title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1) + title_part, mobile_part = title_part.rsplit( + " [MOBILE:", 1 + ) if mobile_part.endswith("]"): mobile_url = mobile_part[:-1] - # 提取url + # 提取 URL url = "" if " [URL:" in title_part: - title_part, url_part = title_part.rsplit(" [URL:", 1) + title_part, url_part = title_part.rsplit( + " [URL:", 1 + ) if url_part.endswith("]"): url = url_part[:-1] - # 提取排名 - ranks = [] - if " (排名:" in title_part: - title, rank_str = title_part.rsplit(" (排名:", 1) - rank_str = rank_str.rstrip(")") - ranks = [int(r) for r in rank_str.split(",") if r.strip() and r.isdigit()] - else: - title = title_part - - if not ranks and match_num is not None: - ranks = [match_num] - if not ranks: - ranks = [99] + title = title_part.strip() + ranks = [rank] if rank is not None else [1] title_data[title] = { "ranks": ranks, "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } except Exception as e: print(f"解析标题行出错: {line}, 错误: {e}") DataProcessor._process_source_data( - source_name, title_data, time_info, - all_results, title_info, id_to_alias + source_name, + title_data, + time_info, + all_results, + title_info, + id_to_alias, ) - # 转换为ID结果 + # 转换为ID格式 id_results = {} id_title_info = {} for name, titles in all_results.items(): @@ -390,27 +493,24 @@ class DataProcessor: @staticmethod def _process_source_data( - source_name: str, title_data: Dict, time_info: str, - all_results: Dict, title_info: Dict, id_to_alias: Dict, + source_name: str, + title_data: Dict, + time_info: str, + all_results: Dict, + title_info: Dict, + id_to_alias: Dict, ) -> None: """处理来源数据,合并重复标题""" if source_name not in all_results: - # 首次遇到此来源 all_results[source_name] = title_data if source_name not in title_info: title_info[source_name] = {} - # 记录标题信息 for title, data in title_data.items(): - if isinstance(data, dict): - ranks = data.get("ranks", []) - url = data.get("url", "") - mobile_url = data.get("mobileUrl", "") - else: - ranks = data if isinstance(data, list) else [] - url = "" - mobile_url = "" + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") title_info[source_name][title] = { "first_time": time_info, @@ -421,27 +521,19 @@ class DataProcessor: "mobileUrl": mobile_url, } - # 生成反向ID映射 reversed_id = source_name.lower().replace(" ", "-") id_to_alias[reversed_id] = source_name else: - # 更新已有来源 for title, data in title_data.items(): - if isinstance(data, dict): - ranks = data.get("ranks", []) - url = data.get("url", "") - mobile_url = data.get("mobileUrl", "") - else: - ranks = data if isinstance(data, list) else [] - url = "" - mobile_url = "" + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") if title not in all_results[source_name]: - # 新标题 all_results[source_name][title] = { "ranks": ranks, "url": url, - "mobileUrl": mobile_url + "mobileUrl": mobile_url, } title_info[source_name][title] = { "first_time": time_info, @@ -452,12 +544,11 @@ class DataProcessor: "mobileUrl": mobile_url, } else: - # 更新已有标题 existing_data = all_results[source_name][title] existing_ranks = existing_data.get("ranks", []) existing_url = existing_data.get("url", "") existing_mobile_url = existing_data.get("mobileUrl", "") - + merged_ranks = existing_ranks.copy() for rank in ranks: if rank not in merged_ranks: @@ -466,13 +557,12 @@ class DataProcessor: all_results[source_name][title] = { "ranks": merged_ranks, "url": existing_url or url, - "mobileUrl": existing_mobile_url or mobile_url + "mobileUrl": existing_mobile_url or mobile_url, } title_info[source_name][title]["last_time"] = time_info title_info[source_name][title]["ranks"] = merged_ranks title_info[source_name][title]["count"] += 1 - # 保留第一个有效URL if not title_info[source_name][title].get("url"): title_info[source_name][title]["url"] = url if not title_info[source_name][title].get("mobileUrl"): @@ -482,6 +572,42 @@ class DataProcessor: class StatisticsCalculator: """统计计算器""" + @staticmethod + def _matches_word_groups( + title: str, word_groups: List[Dict], filter_words: List[str] + ) -> bool: + """检查标题是否匹配词组规则""" + title_lower = title.lower() + + # 过滤词检查 + if any(filter_word.lower() in title_lower for filter_word in filter_words): + return False + + # 词组匹配检查 + for group in word_groups: + required_words = group["required"] + normal_words = group["normal"] + + # 必须词检查 + if required_words: + all_required_present = all( + req_word.lower() in title_lower for req_word in required_words + ) + if not all_required_present: + continue + + # 普通词检查 + if normal_words: + any_normal_present = any( + normal_word.lower() in title_lower for normal_word in normal_words + ) + if not any_normal_present: + continue + + return True + + return False + @staticmethod def count_word_frequency( results: Dict, @@ -490,21 +616,22 @@ class StatisticsCalculator: id_to_alias: Dict, title_info: Optional[Dict] = None, rank_threshold: int = CONFIG["RANK_THRESHOLD"], + new_titles: Optional[Dict] = None, ) -> Tuple[List[Dict], int]: - """统计词频,支持必须词、频率词、过滤词""" + """统计词频,支持必须词、频率词、过滤词,并标记新增标题""" word_stats = {} total_titles = 0 - processed_titles = {} # 跟踪已处理标题 + processed_titles = {} if title_info is None: title_info = {} + if new_titles is None: + new_titles = {} - # 初始化统计对象 for group in word_groups: - group_key = group['group_key'] + group_key = group["group_key"] word_stats[group_key] = {"count": 0, "titles": {}} - # 遍历标题进行统计 for source_id, titles_data in results.items(): total_titles += len(titles_data) @@ -515,57 +642,44 @@ class StatisticsCalculator: if title in processed_titles.get(source_id, {}): continue - title_lower = title.lower() - - # 优先级1:过滤词检查 - contains_filter_word = any( - filter_word.lower() in title_lower for filter_word in filter_words - ) - if contains_filter_word: + # 使用统一的匹配逻辑 + if not StatisticsCalculator._matches_word_groups( + title, word_groups, filter_words + ): continue - # 兼容数据格式 - if isinstance(title_data, dict): - source_ranks = title_data.get("ranks", []) - source_url = title_data.get("url", "") - source_mobile_url = title_data.get("mobileUrl", "") - else: - source_ranks = title_data if isinstance(title_data, list) else [] - source_url = "" - source_mobile_url = "" + source_ranks = title_data.get("ranks", []) + source_url = title_data.get("url", "") + source_mobile_url = title_data.get("mobileUrl", "") - # 检查每个词组 + # 找到匹配的词组 + title_lower = title.lower() for group in word_groups: - group_key = group['group_key'] - required_words = group['required'] - normal_words = group['normal'] + required_words = group["required"] + normal_words = group["normal"] - # 优先级2:必须词检查 + # 再次检查匹配 if required_words: all_required_present = all( - req_word.lower() in title_lower for req_word in required_words + req_word.lower() in title_lower + for req_word in required_words ) if not all_required_present: continue - # 优先级3:频率词检查 if normal_words: any_normal_present = any( - normal_word.lower() in title_lower for normal_word in normal_words + normal_word.lower() in title_lower + for normal_word in normal_words ) if not any_normal_present: continue - # 如果只有必须词没有频率词,且所有必须词都匹配了,那么也算匹配 - # 如果既有必须词又有频率词,那么必须词全部匹配且至少一个频率词匹配 - # 如果只有频率词,那么至少一个频率词匹配 - - # 匹配成功,记录数据 + group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] - # 获取标题详细信息 first_time = "" last_time = "" count_info = 1 @@ -573,7 +687,11 @@ class StatisticsCalculator: url = source_url mobile_url = source_mobile_url - if (title_info and source_id in title_info and title in title_info[source_id]): + if ( + title_info + and source_id in title_info + and title in title_info[source_id] + ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") @@ -586,51 +704,61 @@ class StatisticsCalculator: if not ranks: ranks = [99] - time_display = StatisticsCalculator._format_time_display(first_time, last_time) + time_display = StatisticsCalculator._format_time_display( + first_time, last_time + ) source_alias = id_to_alias.get(source_id, source_id) - word_stats[group_key]["titles"][source_id].append({ - "title": title, - "source_alias": source_alias, - "first_time": first_time, - "last_time": last_time, - "time_display": time_display, - "count": count_info, - "ranks": ranks, - "rank_threshold": rank_threshold, - "url": url, - "mobileUrl": mobile_url, - }) + is_new = source_id in new_titles and title in new_titles[source_id] + + word_stats[group_key]["titles"][source_id].append( + { + "title": title, + "source_alias": source_alias, + "first_time": first_time, + "last_time": last_time, + "time_display": time_display, + "count": count_info, + "ranks": ranks, + "rank_threshold": rank_threshold, + "url": url, + "mobileUrl": mobile_url, + "is_new": is_new, + } + ) - # 标记已处理 if source_id not in processed_titles: processed_titles[source_id] = {} processed_titles[source_id][title] = True - break # 只匹配第一个词组 + break - # 转换统计结果 stats = [] for group_key, data in word_stats.items(): all_titles = [] for source_id, title_list in data["titles"].items(): all_titles.extend(title_list) - stats.append({ - "word": group_key, - "count": data["count"], - "titles": all_titles, - "percentage": ( - round(data["count"] / total_titles * 100, 2) - if total_titles > 0 else 0 - ), - }) + stats.append( + { + "word": group_key, + "count": data["count"], + "titles": all_titles, + "percentage": ( + round(data["count"] / total_titles * 100, 2) + if total_titles > 0 + else 0 + ), + } + ) stats.sort(key=lambda x: x["count"], reverse=True) return stats, total_titles @staticmethod - def _format_rank_for_html(ranks: List[int], rank_threshold: int = 5) -> str: - """格式化HTML排名显示""" + def _format_rank_base( + ranks: List[int], rank_threshold: int = 5, format_type: str = "html" + ) -> str: + """基础排名格式化方法""" if not ranks: return "" @@ -638,44 +766,41 @@ class StatisticsCalculator: min_rank = unique_ranks[0] max_rank = unique_ranks[-1] + # 根据格式类型选择不同的标记方式 + if format_type == "html": + highlight_start = "" + highlight_end = "" + else: # feishu + highlight_start = "**" + highlight_end = "**" + + # 格式化排名显示 if min_rank <= rank_threshold: if min_rank == max_rank: - return f"[{min_rank}]" + return f"{highlight_start}[{min_rank}]{highlight_end}" else: - return f"[{min_rank} - {max_rank}]" + return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}" else: if min_rank == max_rank: return f"[{min_rank}]" else: return f"[{min_rank} - {max_rank}]" + @staticmethod + def _format_rank_for_html(ranks: List[int], rank_threshold: int = 5) -> str: + """格式化HTML排名显示""" + return StatisticsCalculator._format_rank_base(ranks, rank_threshold, "html") + @staticmethod def _format_rank_for_feishu(ranks: List[int], rank_threshold: int = 5) -> str: """格式化飞书排名显示""" - if not ranks: - return "" - - unique_ranks = sorted(set(ranks)) - min_rank = unique_ranks[0] - max_rank = unique_ranks[-1] - - if min_rank <= rank_threshold: - if min_rank == max_rank: - return f"**[{min_rank}]**" - else: - return f"**[{min_rank} - {max_rank}]**" - else: - if min_rank == max_rank: - return f"[{min_rank}]" - else: - return f"[{min_rank} - {max_rank}]" + return StatisticsCalculator._format_rank_base(ranks, rank_threshold, "feishu") @staticmethod def _format_time_display(first_time: str, last_time: str) -> str: """格式化时间显示""" if not first_time: return "" - if first_time == last_time or not last_time: return first_time else: @@ -691,6 +816,8 @@ class ReportGenerator: total_titles: int, failed_ids: Optional[List] = None, is_daily: bool = False, + new_titles: Optional[Dict] = None, + id_to_alias: Optional[Dict] = None, ) -> str: """生成HTML报告""" if is_daily: @@ -700,30 +827,190 @@ class ReportGenerator: file_path = FileHelper.get_output_path("html", filename) - html_content = ReportGenerator._create_html_content( - stats, total_titles, failed_ids, is_daily + # 数据处理层 + report_data = ReportGenerator._prepare_report_data( + stats, failed_ids, new_titles, id_to_alias + ) + + # 渲染层 + html_content = ReportGenerator._render_html_content( + report_data, total_titles, is_daily ) with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) - # 当日统计同时生成根目录index.html if is_daily: root_file_path = Path("index.html") with open(root_file_path, "w", encoding="utf-8") as f: f.write(html_content) - print(f"当日统计报告已保存到根目录: {root_file_path.resolve()}") return file_path @staticmethod - def _create_html_content( + def _prepare_report_data( stats: List[Dict], - total_titles: int, failed_ids: Optional[List] = None, - is_daily: bool = False, + new_titles: Optional[Dict] = None, + id_to_alias: Optional[Dict] = None, + ) -> Dict: + """准备报告数据""" + filtered_new_titles = {} + if new_titles and id_to_alias: + word_groups, filter_words = DataProcessor.load_frequency_words() + for source_id, titles_data in new_titles.items(): + filtered_titles = ReportGenerator._apply_frequency_filter( + titles_data, word_groups, filter_words + ) + if filtered_titles: + filtered_new_titles[source_id] = filtered_titles + + processed_stats = [] + for stat in stats: + if stat["count"] <= 0: + continue + + processed_titles = [] + for title_data in stat["titles"]: + processed_title = { + "title": title_data["title"], + "source_alias": title_data["source_alias"], + "time_display": title_data["time_display"], + "count": title_data["count"], + "ranks": title_data["ranks"], + "rank_threshold": title_data["rank_threshold"], + "url": title_data.get("url", ""), + "mobile_url": title_data.get("mobileUrl", ""), + "is_new": title_data.get("is_new", False), + } + processed_titles.append(processed_title) + + processed_stats.append( + { + "word": stat["word"], + "count": stat["count"], + "percentage": stat.get("percentage", 0), + "titles": processed_titles, + } + ) + + processed_new_titles = [] + if filtered_new_titles and id_to_alias: + for source_id, titles_data in filtered_new_titles.items(): + source_alias = id_to_alias.get(source_id, source_id) + source_titles = [] + + for title, title_data in titles_data.items(): + url, mobile_url, ranks = ReportGenerator._extract_title_data_fields( + title_data + ) + + processed_title = { + "title": title, + "source_alias": source_alias, + "time_display": "", + "count": 1, + "ranks": ranks, + "rank_threshold": CONFIG["RANK_THRESHOLD"], + "url": url, + "mobile_url": mobile_url, + "is_new": True, + } + source_titles.append(processed_title) + + if source_titles: + processed_new_titles.append( + { + "source_id": source_id, + "source_alias": source_alias, + "titles": source_titles, + } + ) + + return { + "stats": processed_stats, + "new_titles": processed_new_titles, + "failed_ids": failed_ids or [], + "total_new_count": sum( + len(source["titles"]) for source in processed_new_titles + ), + } + + @staticmethod + def _extract_title_data_fields(title_data) -> Tuple[str, str, List[int]]: + """提取标题数据的通用字段""" + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") + ranks = title_data.get("ranks", []) + + return url, mobile_url, ranks + + @staticmethod + def _apply_frequency_filter( + titles_data: Dict, word_groups: List[Dict], filter_words: List[str] + ) -> Dict: + """应用频率词过滤逻辑""" + filtered_titles = {} + + for title, title_data in titles_data.items(): + if StatisticsCalculator._matches_word_groups( + title, word_groups, filter_words + ): + filtered_titles[title] = title_data + + return filtered_titles + + @staticmethod + def _html_escape(text: str) -> str: + """HTML转义""" + if not isinstance(text, str): + text = str(text) + + return ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + + @staticmethod + def _format_title_html(title_data: Dict) -> str: + """格式化HTML标题显示""" + rank_display = StatisticsCalculator._format_rank_for_html( + title_data["ranks"], title_data["rank_threshold"] + ) + + link_url = title_data["mobile_url"] or title_data["url"] + escaped_title = ReportGenerator._html_escape(title_data["title"]) + escaped_source_alias = ReportGenerator._html_escape(title_data["source_alias"]) + + if link_url: + escaped_url = ReportGenerator._html_escape(link_url) + formatted_title = f'[{escaped_source_alias}] {escaped_title}' + else: + formatted_title = ( + f'[{escaped_source_alias}] {escaped_title}' + ) + + if rank_display: + formatted_title += f" {rank_display}" + if title_data["time_display"]: + escaped_time = ReportGenerator._html_escape(title_data["time_display"]) + formatted_title += f" - {escaped_time}" + if title_data["count"] > 1: + formatted_title += f" ({title_data['count']}次)" + + if title_data["is_new"]: + formatted_title = f"
🆕 {formatted_title}
" + + return formatted_title + + @staticmethod + def _render_html_content( + report_data: Dict, total_titles: int, is_daily: bool = False ) -> str: - """创建HTML内容""" + """渲染HTML内容""" html = """ @@ -758,6 +1045,24 @@ class ReportGenerator: .no-link { color: #333; } + .new-title { + background-color: #fff3cd; + border: 1px solid #ffc107; + border-radius: 3px; + padding: 2px 6px; + margin: 2px 0; + } + .new-section { + background-color: #d1ecf1; + border: 1px solid #bee5eb; + border-radius: 5px; + padding: 10px; + margin-top: 10px; + } + .new-section h3 { + color: #0c5460; + margin-top: 0; + } @@ -771,20 +1076,21 @@ class ReportGenerator: html += f"

总标题数: {total_titles}

" html += f"

生成时间: {now.strftime('%Y-%m-%d %H:%M:%S')}

" - # 失败信息 - if failed_ids and len(failed_ids) > 0: + # 渲染失败平台 + if report_data["failed_ids"]: html += """

请求失败的平台

""" + # 渲染统计表格 html += """ @@ -796,54 +1102,59 @@ class ReportGenerator: """ - # 表格内容 - for i, stat in enumerate(stats, 1): + for i, stat in enumerate(report_data["stats"], 1): formatted_titles = [] + for title_data in stat["titles"]: - title = title_data["title"] - source_alias = title_data["source_alias"] - time_display = title_data["time_display"] - count_info = title_data["count"] - ranks = title_data["ranks"] - rank_threshold = title_data["rank_threshold"] - url = title_data.get("url", "") - mobile_url = title_data.get("mobileUrl", "") - - rank_display = StatisticsCalculator._format_rank_for_html(ranks, rank_threshold) - - link_url = mobile_url or url - escaped_title = ReportGenerator._html_escape(title) - escaped_source_alias = ReportGenerator._html_escape(source_alias) - - if link_url: - escaped_url = ReportGenerator._html_escape(link_url) - formatted_title = f"[{escaped_source_alias}] {escaped_title}" - else: - formatted_title = f"[{escaped_source_alias}] {escaped_title}" - - if rank_display: - formatted_title += f" {rank_display}" - if time_display: - escaped_time_display = ReportGenerator._html_escape(time_display) - formatted_title += f" - {escaped_time_display}" - if count_info > 1: - formatted_title += f" ({count_info}次)" - + formatted_title = ReportGenerator._format_title_html(title_data) formatted_titles.append(formatted_title) - escaped_word = ReportGenerator._html_escape(stat['word']) + escaped_word = ReportGenerator._html_escape(stat["word"]) html += f""" - + """ html += """
{i} {escaped_word} {stat['count']}{stat['percentage']}%{stat.get('percentage', 0)}% {"
".join(formatted_titles)}
+ """ + + # 渲染新增新闻部分 + if report_data["new_titles"]: + html += f""" +
+

🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)

+ """ + + for source_data in report_data["new_titles"]: + escaped_source = ReportGenerator._html_escape( + source_data["source_alias"] + ) + html += ( + f"

{escaped_source} ({len(source_data['titles'])} 条)

" + + html += "
" + + html += """ """ @@ -851,33 +1162,133 @@ class ReportGenerator: return html @staticmethod - def _html_escape(text: str) -> str: - """HTML转义""" - if not isinstance(text, str): - text = str(text) - - return (text.replace("&", "&") - .replace("<", "<") - .replace(">", ">") - .replace('"', """) - .replace("'", "'")) + def _format_title_feishu(title_data: Dict, show_source: bool = True) -> str: + """格式化飞书标题显示""" + rank_display = StatisticsCalculator._format_rank_for_feishu( + title_data["ranks"], title_data["rank_threshold"] + ) + + link_url = title_data["mobile_url"] or title_data["url"] + if link_url: + formatted_title = f"[{title_data['title']}]({link_url})" + else: + formatted_title = title_data["title"] + + title_prefix = "🆕 " if title_data["is_new"] else "" + + if show_source: + result = f"[{title_data['source_alias']}] {title_prefix}{formatted_title}" + else: + result = f"{title_prefix}{formatted_title}" + + if rank_display: + result += f" {rank_display}" + if title_data["time_display"]: + result += f" - {title_data['time_display']}" + if title_data["count"] > 1: + result += f" ({title_data['count']}次)" + + return result + + @staticmethod + def _render_feishu_content(report_data: Dict) -> str: + """渲染飞书内容""" + text_content = "" + + # 渲染热点词汇统计 + if report_data["stats"]: + text_content += "📊 **热点词汇统计**\n\n" + + total_count = len(report_data["stats"]) + + for i, stat in enumerate(report_data["stats"]): + word = stat["word"] + count = stat["count"] + + sequence_display = f"[{i + 1}/{total_count}]" + + if count >= 10: + text_content += f"🔥 {sequence_display} **{word}** : {count} 条\n\n" + elif count >= 5: + text_content += f"📈 {sequence_display} **{word}** : {count} 条\n\n" + else: + text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n" + + for j, title_data in enumerate(stat["titles"], 1): + formatted_title = ReportGenerator._format_title_feishu( + title_data, show_source=True + ) + text_content += f" {j}. {formatted_title}\n" + + if j < len(stat["titles"]): + text_content += "\n" + + if i < len(report_data["stats"]) - 1: + text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" + + if not text_content: + text_content = "📭 暂无匹配的热点词汇\n\n" + + # 渲染新增新闻部分 + if report_data["new_titles"]: + if text_content and "暂无匹配" not in text_content: + text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" + + text_content += ( + f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" + ) + + for source_data in report_data["new_titles"]: + text_content += f"**{source_data['source_alias']}** ({len(source_data['titles'])} 条):\n" + + for j, title_data in enumerate(source_data["titles"], 1): + formatted_title = ReportGenerator._format_title_feishu( + title_data, show_source=False + ) + text_content += f" {j}. {formatted_title}\n" + + text_content += "\n" + + # 渲染失败平台 + if report_data["failed_ids"]: + if text_content and "暂无匹配" not in text_content: + text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" + + text_content += "⚠️ **数据获取失败的平台:**\n\n" + for i, id_value in enumerate(report_data["failed_ids"], 1): + text_content += f" • {id_value}\n" + + # 添加时间戳 + now = TimeHelper.get_beijing_time() + text_content += f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" + + return text_content @staticmethod def send_to_feishu( stats: List[Dict], failed_ids: Optional[List] = None, report_type: str = "单次爬取", + new_titles: Optional[Dict] = None, + id_to_alias: Optional[Dict] = None, ) -> bool: """发送数据到飞书""" webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) if not webhook_url: - print(f"警告: FEISHU_WEBHOOK_URL未设置,跳过飞书通知") + print(f"FEISHU_WEBHOOK_URL未设置,跳过飞书通知") return False headers = {"Content-Type": "application/json"} total_titles = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0) - text_content = ReportGenerator._build_feishu_content(stats, failed_ids) + + # 数据处理层 + report_data = ReportGenerator._prepare_report_data( + stats, failed_ids, new_titles, id_to_alias + ) + + # 渲染层 + text_content = ReportGenerator._render_feishu_content(report_data) now = TimeHelper.get_beijing_time() payload = { @@ -893,93 +1304,17 @@ class ReportGenerator: try: response = requests.post(webhook_url, headers=headers, json=payload) if response.status_code == 200: - print(f"数据发送到飞书成功 [{report_type}]") + print(f"飞书通知发送成功 [{report_type}]") return True else: - print(f"发送到飞书失败 [{report_type}],状态码:{response.status_code},响应:{response.text}") + print( + f"飞书通知发送失败 [{report_type}],状态码:{response.status_code}" + ) return False except Exception as e: - print(f"发送到飞书时出错 [{report_type}]:{e}") + print(f"飞书通知发送出错 [{report_type}]:{e}") return False - @staticmethod - def _build_feishu_content(stats: List[Dict], failed_ids: Optional[List] = None) -> str: - """构建飞书消息内容""" - text_content = "" - filtered_stats = [stat for stat in stats if stat["count"] > 0] - - if filtered_stats: - text_content += "📊 **热点词汇统计**\n\n" - - total_count = len(filtered_stats) - - for i, stat in enumerate(filtered_stats): - word = stat["word"] - count = stat["count"] - - sequence_display = f"[{i + 1}/{total_count}]" - - # 频次颜色分级 - if count >= 10: - text_content += f"🔥 {sequence_display} **{word}** : {count} 条\n\n" - elif count >= 5: - text_content += f"📈 {sequence_display} **{word}** : {count} 条\n\n" - else: - text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n" - - # 标题列表 - for j, title_data in enumerate(stat["titles"], 1): - title = title_data["title"] - source_alias = title_data["source_alias"] - time_display = title_data["time_display"] - count_info = title_data["count"] - ranks = title_data["ranks"] - rank_threshold = title_data["rank_threshold"] - url = title_data.get("url", "") - mobile_url = title_data.get("mobileUrl", "") - - rank_display = StatisticsCalculator._format_rank_for_feishu(ranks, rank_threshold) - - link_url = mobile_url or url - if link_url: - formatted_title = f"[{title}]({link_url})" - else: - formatted_title = title - - text_content += f" {j}. [{source_alias}] {formatted_title}" - - if rank_display: - text_content += f" {rank_display}" - if time_display: - text_content += f" - {time_display}" - if count_info > 1: - text_content += f" ({count_info}次)" - text_content += "\n" - - if j < len(stat["titles"]): - text_content += "\n" - - # 分割线 - if i < len(filtered_stats) - 1: - text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" - - if not text_content: - text_content = "📭 暂无匹配的热点词汇\n\n" - - # 失败平台信息 - if failed_ids and len(failed_ids) > 0: - if text_content and "暂无匹配" not in text_content: - text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" - - text_content += "⚠️ **数据获取失败的平台:**\n\n" - for i, id_value in enumerate(failed_ids, 1): - text_content += f" • {id_value}\n" - - now = TimeHelper.get_beijing_time() - text_content += f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" - - return text_content - class NewsAnalyzer: """新闻分析器""" @@ -990,14 +1325,11 @@ class NewsAnalyzer: feishu_report_type: str = CONFIG["FEISHU_REPORT_TYPE"], rank_threshold: int = CONFIG["RANK_THRESHOLD"], ): - """初始化分析器""" self.request_interval = request_interval self.feishu_report_type = feishu_report_type self.rank_threshold = rank_threshold - self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true" - # 设置代理 self.proxy_url = None if not self.is_github_actions and CONFIG["USE_PROXY"]: self.proxy_url = CONFIG["DEFAULT_PROXY"] @@ -1011,7 +1343,7 @@ class NewsAnalyzer: def generate_daily_summary(self) -> Optional[str]: """生成当日统计报告""" - print("开始生成当日统计报告...") + print("生成当日统计报告...") all_results, id_to_alias, title_info = DataProcessor.read_all_today_titles() @@ -1022,20 +1354,36 @@ class NewsAnalyzer: total_titles = sum(len(titles) for titles in all_results.values()) print(f"读取到 {total_titles} 个标题") + latest_new_titles = DataProcessor.detect_latest_new_titles(id_to_alias) + if latest_new_titles: + total_new_count = sum(len(titles) for titles in latest_new_titles.values()) + print(f"检测到 {total_new_count} 条最新新增新闻") + word_groups, filter_words = DataProcessor.load_frequency_words() stats, total_titles = StatisticsCalculator.count_word_frequency( - all_results, word_groups, filter_words, - id_to_alias, title_info, self.rank_threshold, + all_results, + word_groups, + filter_words, + id_to_alias, + title_info, + self.rank_threshold, + latest_new_titles, ) html_file = ReportGenerator.generate_html_report( - stats, total_titles, is_daily=True + stats, + total_titles, + is_daily=True, + new_titles=latest_new_titles, + id_to_alias=id_to_alias, ) print(f"当日HTML统计报告已生成: {html_file}") if self.feishu_report_type in ["daily", "both"]: - ReportGenerator.send_to_feishu(stats, [], "当日汇总") + ReportGenerator.send_to_feishu( + stats, [], "当日汇总", latest_new_titles, id_to_alias + ) return html_file @@ -1046,16 +1394,16 @@ class NewsAnalyzer: webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) if not webhook_url and not CONFIG["CONTINUE_WITHOUT_FEISHU"]: - print("错误: FEISHU_WEBHOOK_URL未设置且CONTINUE_WITHOUT_FEISHU为False,程序退出") + print( + "错误: FEISHU_WEBHOOK_URL未设置且CONTINUE_WITHOUT_FEISHU为False,程序退出" + ) return if not webhook_url: - print("警告: FEISHU_WEBHOOK_URL未设置,将继续执行爬虫但不发送飞书通知") + print("FEISHU_WEBHOOK_URL未设置,将继续执行爬虫但不发送飞书通知") print(f"飞书报告类型: {self.feishu_report_type}") - print(f"排名阈值: {self.rank_threshold}") - # 爬取目标列表 ids = [ ("toutiao", "今日头条"), ("baidu", "百度热搜"), @@ -1071,31 +1419,26 @@ class NewsAnalyzer: ] print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒") - FileHelper.ensure_directory_exists("output") - # 爬取数据 - results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites(ids, self.request_interval) + results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites( + ids, self.request_interval + ) - # 保存文件 title_file = DataProcessor.save_titles_to_file(results, id_to_alias, failed_ids) print(f"标题已保存到: {title_file}") - time_info = Path(title_file).stem + new_titles = DataProcessor.detect_latest_new_titles(id_to_alias) - # 创建标题信息 + # 构建标题信息 + time_info = Path(title_file).stem title_info = {} for source_id, titles_data in results.items(): title_info[source_id] = {} for title, title_data in titles_data.items(): - if isinstance(title_data, dict): - ranks = title_data.get("ranks", []) - url = title_data.get("url", "") - mobile_url = title_data.get("mobileUrl", "") - else: - ranks = title_data if isinstance(title_data, list) else [] - url = "" - mobile_url = "" + ranks = title_data.get("ranks", []) + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") title_info[source_id][title] = { "first_time": time_info, @@ -1109,20 +1452,27 @@ class NewsAnalyzer: word_groups, filter_words = DataProcessor.load_frequency_words() stats, total_titles = StatisticsCalculator.count_word_frequency( - results, word_groups, filter_words, - id_to_alias, title_info, self.rank_threshold, + results, + word_groups, + filter_words, + id_to_alias, + title_info, + self.rank_threshold, + new_titles, ) - # 发送报告 if self.feishu_report_type in ["current", "both"]: - ReportGenerator.send_to_feishu(stats, failed_ids, "单次爬取") + ReportGenerator.send_to_feishu( + stats, failed_ids, "单次爬取", new_titles, id_to_alias + ) - html_file = ReportGenerator.generate_html_report(stats, total_titles, failed_ids) + html_file = ReportGenerator.generate_html_report( + stats, total_titles, failed_ids, False, new_titles, id_to_alias + ) print(f"HTML报告已生成: {html_file}") daily_html = self.generate_daily_summary() - # 本地环境自动打开HTML if not self.is_github_actions and html_file: file_url = "file://" + str(Path(html_file).resolve()) print(f"正在打开HTML报告: {file_url}") @@ -1135,7 +1485,6 @@ class NewsAnalyzer: def main(): - """程序入口""" analyzer = NewsAnalyzer( request_interval=CONFIG["REQUEST_INTERVAL"], feishu_report_type=CONFIG["FEISHU_REPORT_TYPE"],