From 72954d0fa85449f0591af7bd41531a948cd1c9d4 Mon Sep 17 00:00:00 2001 From: sansan <77180927+sansan0@users.noreply.github.com> Date: Mon, 9 Jun 2025 19:21:35 +0800 Subject: [PATCH] Update main.py --- main.py | 673 ++++++++++++++++++++------------------------------------ 1 file changed, 239 insertions(+), 434 deletions(-) diff --git a/main.py b/main.py index f18d70d..8336ae3 100644 --- a/main.py +++ b/main.py @@ -1,31 +1,32 @@ # coding=utf-8 import json -import os import time import random from datetime import datetime import webbrowser from typing import Dict, List, Tuple, Optional, Union +from pathlib import Path +import os import requests import pytz # 配置常量 CONFIG = { - "FEISHU_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # 飞书消息中,每个频率词之间的分割线,注意,其它类型的分割线可能会被飞书过滤而不显示 - "REQUEST_INTERVAL": 1000, # 毫秒 - "FEISHU_REPORT_TYPE": "daily", # 可选: "current", "daily", "both" - "RANK_THRESHOLD": 5, # 排名阈值,前5名使用红色加粗显示 - "USE_PROXY": True, # 是否启用本地代理 + "FEISHU_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # 飞书消息分割线,注意,其它类型的分割线可能会被飞书过滤而不显示 + "REQUEST_INTERVAL": 1000, # 请求间隔(毫秒) + "FEISHU_REPORT_TYPE": "daily", # 飞书报告类型: "current"|"daily"|"both" + "RANK_THRESHOLD": 5, # 排名高亮阈值 + "USE_PROXY": True, # 是否启用代理 "DEFAULT_PROXY": "http://127.0.0.1:10086", - "CONTINUE_WITHOUT_FEISHU": True, # 控制是否在没有飞书webhook URL时继续执行爬虫, 如果True ,会依然进行爬虫行为,会在github上持续的生成爬取的新闻数据 - "FEISHU_WEBHOOK_URL": "", # 飞书机器人的webhook URL,大概长这样:https://www.feishu.cn/flow/api/trigger-webhook/xxxx, 默认为空,推荐通过GitHub Secrets设置 + "CONTINUE_WITHOUT_FEISHU": True, # 控制在没有飞书 webhook URL 时是否继续执行爬虫, 如果 True ,会依然进行爬虫行为,并在 github 上持续的生成爬取的新闻数据 + "FEISHU_WEBHOOK_URL": "", # 飞书机器人的 webhook URL,大概长这样:https://www.feishu.cn/flow/api/trigger-webhook/xxxx, 默认为空,推荐通过GitHub Secrets设置 } class TimeHelper: - """时间相关的辅助功能""" + """时间处理工具""" @staticmethod def get_beijing_time() -> datetime: @@ -34,7 +35,7 @@ class TimeHelper: @staticmethod def format_date_folder() -> str: - """返回日期文件夹名称格式""" + """返回日期文件夹格式""" return TimeHelper.get_beijing_time().strftime("%Y年%m月%d日") @staticmethod @@ -44,25 +45,24 @@ class TimeHelper: class FileHelper: - """文件操作相关的辅助功能""" + """文件操作工具""" @staticmethod def ensure_directory_exists(directory: str) -> None: - """确保目录存在,如果不存在则创建""" - if not os.path.exists(directory): - os.makedirs(directory) + """确保目录存在""" + Path(directory).mkdir(parents=True, exist_ok=True) @staticmethod def get_output_path(subfolder: str, filename: str) -> str: """获取输出文件路径""" date_folder = TimeHelper.format_date_folder() - output_dir = os.path.join("output", date_folder, subfolder) - FileHelper.ensure_directory_exists(output_dir) - return os.path.join(output_dir, filename) + output_dir = Path("output") / date_folder / subfolder + FileHelper.ensure_directory_exists(str(output_dir)) + return str(output_dir / filename) class DataFetcher: - """数据获取相关功能""" + """数据获取器""" def __init__(self, proxy_url: Optional[str] = None): self.proxy_url = proxy_url @@ -74,20 +74,8 @@ class DataFetcher: min_retry_wait: int = 3, max_retry_wait: int = 5, ) -> Tuple[Optional[str], str, str]: - """ - 同步获取指定ID的数据,失败时进行重试 - 接受'success'和'cache'两种状态,其他状态才会触发重试 - - Args: - id_info: ID信息,可以是ID字符串或(ID, 别名)元组 - max_retries: 最大重试次数 - min_retry_wait: 最小重试等待时间(秒) - max_retry_wait: 最大重试等待时间(秒) - - Returns: - (响应数据, ID, 别名)元组,如果请求失败则响应数据为None - """ - # 处理ID和别名 + """获取指定ID数据,支持重试""" + # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: @@ -101,7 +89,6 @@ class DataFetcher: if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} - # 添加随机性模拟真实用户 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "application/json, text/plain, */*", @@ -113,24 +100,18 @@ class DataFetcher: retries = 0 while retries <= max_retries: try: - print( - f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})" - ) - response = requests.get( - url, proxies=proxies, headers=headers, timeout=10 - ) - response.raise_for_status() # 检查HTTP状态码 + print(f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})") + response = requests.get(url, proxies=proxies, headers=headers, timeout=10) + response.raise_for_status() - # 解析JSON并检查响应状态 data_text = response.text data_json = json.loads(data_text) - # 修改状态检查逻辑:接受success和cache两种状态 + # 检查响应状态,接受success和cache status = data_json.get("status", "未知") if status not in ["success", "cache"]: raise ValueError(f"响应状态异常: {status}") - # 记录状态信息 status_info = "最新数据" if status == "success" else "缓存数据" print(f"成功获取 {id_value} 数据({status_info})") return data_text, id_value, alias @@ -138,14 +119,12 @@ class DataFetcher: except Exception as e: retries += 1 if retries <= max_retries: - # 计算重试等待时间:基础3-5秒,每次重试增加1-2秒 + # 计算重试等待时间:基础时间+递增时间 base_wait = random.uniform(min_retry_wait, max_retry_wait) additional_wait = (retries - 1) * random.uniform(1, 2) wait_time = base_wait + additional_wait - print( - f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试..." - ) + print(f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试...") time.sleep(wait_time) else: print(f"请求 {id_value} 失败: {e}. 已达到最大重试次数。") @@ -157,103 +136,85 @@ class DataFetcher: ids_list: List[Union[str, Tuple[str, str]]], request_interval: int = CONFIG["REQUEST_INTERVAL"], ) -> Tuple[Dict, Dict, List]: - """ - 爬取多个网站的数据,使用同步请求 - - Args: - ids_list: ID列表,每个元素可以是ID字符串或(ID, 别名)元组 - request_interval: 请求间隔(毫秒) - - Returns: - (results, id_to_alias, failed_ids)元组 - """ + """爬取多个网站数据""" results = {} id_to_alias = {} failed_ids = [] for i, id_info in enumerate(ids_list): - # 处理ID和别名 + # 解析ID和别名 if isinstance(id_info, tuple): id_value, alias = id_info else: id_value = id_info alias = id_value - # 添加到ID-别名映射 id_to_alias[id_value] = alias - # 发送请求 + # 获取数据 response, _, _ = self.fetch_data(id_info) - # 处理响应 if response: try: data = json.loads(response) - # 获取标题列表,同时记录排名、url和mobileUrl results[id_value] = {} for index, item in enumerate(data.get("items", []), 1): title = item["title"] - # 获取url和mobileUrl,提供容错处理 url = item.get("url", "") mobile_url = item.get("mobileUrl", "") if title in results[id_value]: - # 如果标题已存在,更新排名列表,保持第一个URL + # 标题已存在,更新排名 results[id_value][title]["ranks"].append(index) else: - # 新标题,存储完整信息 + # 新标题 results[id_value][title] = { "ranks": [index], "url": url, "mobileUrl": mobile_url } except json.JSONDecodeError: - print(f"解析 {id_value} 的响应失败,不是有效的JSON") + print(f"解析 {id_value} 响应失败,非有效JSON") failed_ids.append(id_value) except Exception as e: - print(f"处理 {id_value} 数据时出错: {e}") + print(f"处理 {id_value} 数据出错: {e}") failed_ids.append(id_value) else: failed_ids.append(id_value) - # 添加间隔时间,除非是最后一个请求 + # 添加请求间隔 if i < len(ids_list) - 1: - # 添加一些随机性到间隔时间 actual_interval = request_interval + random.randint(-10, 20) - actual_interval = max(50, actual_interval) # 确保至少50毫秒 + actual_interval = max(50, actual_interval) # 最少50毫秒 print(f"等待 {actual_interval} 毫秒后发送下一个请求...") time.sleep(actual_interval / 1000) print(f"\n请求总结:") - print(f"- 成功获取数据的ID: {list(results.keys())}") - print(f"- 请求失败的ID: {failed_ids}") + print(f"- 成功获取数据: {list(results.keys())}") + print(f"- 请求失败: {failed_ids}") return results, id_to_alias, failed_ids class DataProcessor: - """数据处理相关功能""" + """数据处理器""" @staticmethod def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str: - """将标题保存到文件,包括失败的请求信息、url和mobileUrl""" - file_path = FileHelper.get_output_path( - "txt", f"{TimeHelper.format_time_filename()}.txt" - ) + """保存标题到文件""" + file_path = FileHelper.get_output_path("txt", f"{TimeHelper.format_time_filename()}.txt") with open(file_path, "w", encoding="utf-8") as f: - # 先写入成功获取的数据 + # 写入成功数据 for id_value, title_data in results.items(): display_name = id_to_alias.get(id_value, id_value) f.write(f"{display_name}\n") for i, (title, info) in enumerate(title_data.items(), 1): - # 处理新格式数据(包含ranks、url和mobileUrl) if isinstance(info, dict): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") rank_str = ",".join(map(str, ranks)) - # 格式:序号. 标题 (排名:1,2,3) [URL:url] [MOBILE:mobile_url] line = f"{i}. {title} (排名:{rank_str})" if url: line += f" [URL:{url}]" @@ -261,12 +222,12 @@ class DataProcessor: line += f" [MOBILE:{mobile_url}]" f.write(line + "\n") else: - # 兼容旧格式数据(只有ranks列表) + # 兼容旧格式 rank_str = ",".join(map(str, info)) f.write(f"{i}. {title} (排名:{rank_str})\n") f.write("\n") - # 如果有失败的请求,写入失败信息 + # 写入失败信息 if failed_ids: f.write("==== 以下ID请求失败 ====\n") for id_value in failed_ids: @@ -276,83 +237,78 @@ class DataProcessor: return file_path @staticmethod - def load_frequency_words( - frequency_file: str = "frequency_words.txt", - ) -> Tuple[List[List[str]], List[str]]: - """ - 加载频率词和过滤词,处理关联词 - - Returns: - (word_groups, filter_words)元组 - """ - if not os.path.exists(frequency_file): + def load_frequency_words(frequency_file: str = "frequency_words.txt") -> Tuple[List[Dict], List[str]]: + """加载频率词配置""" + frequency_path = Path(frequency_file) + if not frequency_path.exists(): print(f"频率词文件 {frequency_file} 不存在") return [], [] - with open(frequency_file, "r", encoding="utf-8") as f: + with open(frequency_path, "r", encoding="utf-8") as f: content = f.read() - # 按双空行分割不同的词组 - word_groups = [ - group.strip() for group in content.split("\n\n") if group.strip() - ] + # 按双空行分割词组 + word_groups = [group.strip() for group in content.split("\n\n") if group.strip()] - # 处理每个词组 processed_groups = [] - filter_words = [] # 用于存储过滤词 + filter_words = [] for group in word_groups: words = [word.strip() for word in group.split("\n") if word.strip()] - # 分离频率词和过滤词 - group_frequency_words = [] + # 分类词汇 + group_required_words = [] # +开头必须词 + group_normal_words = [] # 普通频率词 + group_filter_words = [] # !开头过滤词 for word in words: if word.startswith("!"): - # 去掉感叹号,添加到过滤词列表 filter_words.append(word[1:]) + group_filter_words.append(word[1:]) + elif word.startswith("+"): + group_required_words.append(word[1:]) else: - # 正常的频率词 - group_frequency_words.append(word) + group_normal_words.append(word) - # 只有当词组中包含频率词时才添加到结果中 - if group_frequency_words: - processed_groups.append(group_frequency_words) + # 只处理包含有效词的组 + if group_required_words or group_normal_words: + # 生成组标识 + if group_normal_words: + group_key = " ".join(group_normal_words) + else: + group_key = " ".join(group_required_words) + + processed_groups.append({ + 'required': group_required_words, + 'normal': group_normal_words, + 'group_key': group_key + }) return processed_groups, filter_words @staticmethod def read_all_today_titles() -> Tuple[Dict, Dict, Dict]: - """ - 读取当天所有txt文件的标题,并按来源合并,去除重复,记录时间和出现次数 - 兼容新格式(包含url和mobileUrl)和旧格式数据 - - Returns: - (all_results, id_to_alias, title_info)元组 - """ + """读取当天所有标题文件""" date_folder = TimeHelper.format_date_folder() - txt_dir = os.path.join("output", date_folder, "txt") + txt_dir = Path("output") / date_folder / "txt" - if not os.path.exists(txt_dir): + if not txt_dir.exists(): print(f"今日文件夹 {txt_dir} 不存在") return {}, {}, {} - all_results = {} # 所有源的所有标题 {source_id: {title: {"ranks": [排名列表], "url": "链接", "mobileUrl": "移动链接"}}} - id_to_alias = {} # ID到别名的映射 - title_info = {} # 标题信息 + all_results = {} + id_to_alias = {} + title_info = {} - # 读取所有txt文件,按时间排序确保早的时间优先处理 - files = sorted([f for f in os.listdir(txt_dir) if f.endswith(".txt")]) + # 按时间排序处理文件 + files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) - for file in files: - # 从文件名提取时间信息 (例如 "12时34分.txt") - time_info = file.replace(".txt", "") - - file_path = os.path.join(txt_dir, file) + for file_path in files: + time_info = file_path.stem + with open(file_path, "r", encoding="utf-8") as f: content = f.read() - # 解析内容 sections = content.split("\n\n") for section in sections: if not section.strip() or "==== 以下ID请求失败 ====" in section: @@ -362,58 +318,49 @@ class DataProcessor: if len(lines) < 2: continue - # 第一行是来源名 source_name = lines[0].strip() - # 提取标题和排名,兼容新旧格式 + # 解析标题数据 title_data = {} for line in lines[1:]: if line.strip(): try: - # 提取序号和正文部分 match_num = None title_part = line.strip() - # 处理格式 "数字. 标题" + # 提取序号 if ". " in title_part and title_part.split(". ")[0].isdigit(): parts = title_part.split(". ", 1) - match_num = int(parts[0]) # 序号可能是排名 + match_num = int(parts[0]) title_part = parts[1] - # 提取mobileUrl信息 "[MOBILE:mobile_url]" + # 提取mobileUrl mobile_url = "" if " [MOBILE:" in title_part: title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1) if mobile_part.endswith("]"): mobile_url = mobile_part[:-1] - # 提取url信息 "[URL:url]" + # 提取url url = "" if " [URL:" in title_part: title_part, url_part = title_part.rsplit(" [URL:", 1) if url_part.endswith("]"): url = url_part[:-1] - # 提取排名信息 "标题 (排名:1,2,3)" + # 提取排名 ranks = [] if " (排名:" in title_part: title, rank_str = title_part.rsplit(" (排名:", 1) rank_str = rank_str.rstrip(")") - ranks = [ - int(r) - for r in rank_str.split(",") - if r.strip() and r.isdigit() - ] + ranks = [int(r) for r in rank_str.split(",") if r.strip() and r.isdigit()] else: title = title_part - # 如果没找到排名但有序号,则使用序号 if not ranks and match_num is not None: ranks = [match_num] - - # 确保排名列表不为空 if not ranks: - ranks = [99] # 默认排名 + ranks = [99] title_data[title] = { "ranks": ranks, @@ -424,17 +371,12 @@ class DataProcessor: except Exception as e: print(f"解析标题行出错: {line}, 错误: {e}") - # 处理来源数据 DataProcessor._process_source_data( - source_name, - title_data, - time_info, - all_results, - title_info, - id_to_alias, + source_name, title_data, time_info, + all_results, title_info, id_to_alias ) - # 将结果从 {source_name: {title: data}} 转换为 {source_id: {title: data}} + # 转换为ID结果 id_results = {} id_title_info = {} for name, titles in all_results.items(): @@ -448,69 +390,61 @@ class DataProcessor: @staticmethod def _process_source_data( - source_name: str, - title_data: Dict, - time_info: str, - all_results: Dict, - title_info: Dict, - id_to_alias: Dict, + source_name: str, title_data: Dict, time_info: str, + all_results: Dict, title_info: Dict, id_to_alias: Dict, ) -> None: - """处理来源数据,更新结果和标题信息,兼容新旧数据格式""" + """处理来源数据,合并重复标题""" if source_name not in all_results: # 首次遇到此来源 all_results[source_name] = title_data - # 初始化标题信息 if source_name not in title_info: title_info[source_name] = {} - # 记录每个标题的时间、次数、排名、url和mobileUrl + # 记录标题信息 for title, data in title_data.items(): - # 兼容新旧格式 if isinstance(data, dict): ranks = data.get("ranks", []) url = data.get("url", "") mobile_url = data.get("mobileUrl", "") else: - # 旧格式兼容 ranks = data if isinstance(data, list) else [] url = "" mobile_url = "" title_info[source_name][title] = { - "first_time": time_info, # 记录首次时间 - "last_time": time_info, # 最后时间初始同首次时间 + "first_time": time_info, + "last_time": time_info, "count": 1, "ranks": ranks, "url": url, "mobileUrl": mobile_url, } - # 尝试反向生成ID + # 生成反向ID映射 reversed_id = source_name.lower().replace(" ", "-") id_to_alias[reversed_id] = source_name else: - # 已有此来源,更新标题 + # 更新已有来源 for title, data in title_data.items(): - # 兼容新旧格式 if isinstance(data, dict): ranks = data.get("ranks", []) url = data.get("url", "") mobile_url = data.get("mobileUrl", "") else: - # 旧格式兼容 ranks = data if isinstance(data, list) else [] url = "" mobile_url = "" if title not in all_results[source_name]: + # 新标题 all_results[source_name][title] = { "ranks": ranks, "url": url, "mobileUrl": mobile_url } title_info[source_name][title] = { - "first_time": time_info, # 新标题的首次和最后时间都设为当前 + "first_time": time_info, "last_time": time_info, "count": 1, "ranks": ranks, @@ -518,7 +452,7 @@ class DataProcessor: "mobileUrl": mobile_url, } else: - # 已存在的标题,更新最后时间,合并排名信息并增加计数 + # 更新已有标题 existing_data = all_results[source_name][title] existing_ranks = existing_data.get("ranks", []) existing_url = existing_data.get("url", "") @@ -529,17 +463,16 @@ class DataProcessor: if rank not in merged_ranks: merged_ranks.append(rank) - # 更新数据,保持第一个有效的URL all_results[source_name][title] = { "ranks": merged_ranks, "url": existing_url or url, "mobileUrl": existing_mobile_url or mobile_url } - title_info[source_name][title]["last_time"] = time_info # 更新最后时间 + title_info[source_name][title]["last_time"] = time_info title_info[source_name][title]["ranks"] = merged_ranks title_info[source_name][title]["count"] += 1 - # 保持第一个有效的URL + # 保留第一个有效URL if not title_info[source_name][title].get("url"): title_info[source_name][title]["url"] = url if not title_info[source_name][title].get("mobileUrl"): @@ -547,184 +480,170 @@ class DataProcessor: class StatisticsCalculator: - """统计计算相关功能""" + """统计计算器""" @staticmethod def count_word_frequency( results: Dict, - word_groups: List[List[str]], + word_groups: List[Dict], filter_words: List[str], id_to_alias: Dict, title_info: Optional[Dict] = None, rank_threshold: int = CONFIG["RANK_THRESHOLD"], ) -> Tuple[List[Dict], int]: - """ - 统计词频,处理关联词和大小写不敏感,每个标题只计入首个匹配词组,并应用过滤词 - 支持新格式数据(包含url和mobileUrl) - - Returns: - (stats, total_titles)元组 - """ + """统计词频,支持必须词、频率词、过滤词""" word_stats = {} total_titles = 0 - processed_titles = {} # 用于跟踪已处理标题 {source_id: {title: True}} + processed_titles = {} # 跟踪已处理标题 - # 初始化title_info if title_info is None: title_info = {} - # 为每个词组创建统计对象 + # 初始化统计对象 for group in word_groups: - group_key = " ".join(group) + group_key = group['group_key'] word_stats[group_key] = {"count": 0, "titles": {}} - # 遍历所有标题并统计 + # 遍历标题进行统计 for source_id, titles_data in results.items(): total_titles += len(titles_data) - # 初始化该来源的处理记录 if source_id not in processed_titles: processed_titles[source_id] = {} for title, title_data in titles_data.items(): - # 跳过已处理的标题 if title in processed_titles.get(source_id, {}): continue - title_lower = title.lower() # 转换为小写以实现大小写不敏感 + title_lower = title.lower() - # 检查是否包含任何过滤词 + # 优先级1:过滤词检查 contains_filter_word = any( filter_word.lower() in title_lower for filter_word in filter_words ) - - # 如果包含过滤词,跳过这个标题 if contains_filter_word: continue - # 兼容新旧数据格式 + # 兼容数据格式 if isinstance(title_data, dict): source_ranks = title_data.get("ranks", []) source_url = title_data.get("url", "") source_mobile_url = title_data.get("mobileUrl", "") else: - # 旧格式兼容 source_ranks = title_data if isinstance(title_data, list) else [] source_url = "" source_mobile_url = "" - # 按顺序检查每个词组 + # 检查每个词组 for group in word_groups: - group_key = " ".join(group) + group_key = group['group_key'] + required_words = group['required'] + normal_words = group['normal'] - # 检查是否有任何一个词在标题中 - matched = any(word.lower() in title_lower for word in group) - - # 如果匹配,增加计数并添加标题,然后标记为已处理 - if matched: - word_stats[group_key]["count"] += 1 - if source_id not in word_stats[group_key]["titles"]: - word_stats[group_key]["titles"][source_id] = [] - - # 获取标题信息 - first_time = "" - last_time = "" - count_info = 1 - ranks = source_ranks if source_ranks else [] - url = source_url - mobile_url = source_mobile_url - - if ( - title_info - and source_id in title_info - and title in title_info[source_id] - ): - info = title_info[source_id][title] - first_time = info.get("first_time", "") - last_time = info.get("last_time", "") - count_info = info.get("count", 1) - if "ranks" in info and info["ranks"]: - ranks = info["ranks"] - url = info.get("url", source_url) - mobile_url = info.get("mobileUrl", source_mobile_url) - - # 确保排名是有效的 - if not ranks: - ranks = [99] # 使用默认排名 - - # 格式化时间信息 - time_display = StatisticsCalculator._format_time_display( - first_time, last_time + # 优先级2:必须词检查 + if required_words: + all_required_present = all( + req_word.lower() in title_lower for req_word in required_words ) + if not all_required_present: + continue - # 添加带完整信息的标题数据,保存原始数据用于后续格式化 - source_alias = id_to_alias.get(source_id, source_id) - word_stats[group_key]["titles"][source_id].append( - { - "title": title, - "source_alias": source_alias, - "first_time": first_time, - "last_time": last_time, - "time_display": time_display, - "count": count_info, - "ranks": ranks, - "rank_threshold": rank_threshold, - "url": url, # 新增url字段 - "mobileUrl": mobile_url, # 新增mobileUrl字段 - } + # 优先级3:频率词检查 + if normal_words: + any_normal_present = any( + normal_word.lower() in title_lower for normal_word in normal_words ) + if not any_normal_present: + continue - # 标记该标题已处理,不再匹配其他词组 - if source_id not in processed_titles: - processed_titles[source_id] = {} - processed_titles[source_id][title] = True - break # 找到第一个匹配的词组后退出循环 + # 如果只有必须词没有频率词,且所有必须词都匹配了,那么也算匹配 + # 如果既有必须词又有频率词,那么必须词全部匹配且至少一个频率词匹配 + # 如果只有频率词,那么至少一个频率词匹配 - # 转换统计结果 - 这里不再进行格式化,保留原始数据 + # 匹配成功,记录数据 + word_stats[group_key]["count"] += 1 + if source_id not in word_stats[group_key]["titles"]: + word_stats[group_key]["titles"][source_id] = [] + + # 获取标题详细信息 + first_time = "" + last_time = "" + count_info = 1 + ranks = source_ranks if source_ranks else [] + url = source_url + mobile_url = source_mobile_url + + if (title_info and source_id in title_info and title in title_info[source_id]): + info = title_info[source_id][title] + first_time = info.get("first_time", "") + last_time = info.get("last_time", "") + count_info = info.get("count", 1) + if "ranks" in info and info["ranks"]: + ranks = info["ranks"] + url = info.get("url", source_url) + mobile_url = info.get("mobileUrl", source_mobile_url) + + if not ranks: + ranks = [99] + + time_display = StatisticsCalculator._format_time_display(first_time, last_time) + + source_alias = id_to_alias.get(source_id, source_id) + word_stats[group_key]["titles"][source_id].append({ + "title": title, + "source_alias": source_alias, + "first_time": first_time, + "last_time": last_time, + "time_display": time_display, + "count": count_info, + "ranks": ranks, + "rank_threshold": rank_threshold, + "url": url, + "mobileUrl": mobile_url, + }) + + # 标记已处理 + if source_id not in processed_titles: + processed_titles[source_id] = {} + processed_titles[source_id][title] = True + break # 只匹配第一个词组 + + # 转换统计结果 stats = [] for group_key, data in word_stats.items(): all_titles = [] for source_id, title_list in data["titles"].items(): all_titles.extend(title_list) - stats.append( - { - "word": group_key, - "count": data["count"], - "titles": all_titles, # 保存原始标题数据,用于后续格式化 - "percentage": ( - round(data["count"] / total_titles * 100, 2) - if total_titles > 0 - else 0 - ), - } - ) + stats.append({ + "word": group_key, + "count": data["count"], + "titles": all_titles, + "percentage": ( + round(data["count"] / total_titles * 100, 2) + if total_titles > 0 else 0 + ), + }) - # 按出现次数从高到低排序 stats.sort(key=lambda x: x["count"], reverse=True) - return stats, total_titles @staticmethod def _format_rank_for_html(ranks: List[int], rank_threshold: int = 5) -> str: - """格式化排名显示用于HTML,前5名使用红色粗体""" + """格式化HTML排名显示""" if not ranks: return "" - # 排序排名并确保不重复 unique_ranks = sorted(set(ranks)) min_rank = unique_ranks[0] max_rank = unique_ranks[-1] - # 所有排名都使用[],只有前5名显示红色粗体 if min_rank <= rank_threshold: if min_rank == max_rank: - # 单一排名且在前5 return f"[{min_rank}]" else: return f"[{min_rank} - {max_rank}]" else: - # 排名在5名之后,使用普通显示 if min_rank == max_rank: return f"[{min_rank}]" else: @@ -732,24 +651,20 @@ class StatisticsCalculator: @staticmethod def _format_rank_for_feishu(ranks: List[int], rank_threshold: int = 5) -> str: - """格式化排名显示用于飞书,前5名使用红色粗体markdown格式""" + """格式化飞书排名显示""" if not ranks: return "" - # 排序排名并确保不重复 unique_ranks = sorted(set(ranks)) min_rank = unique_ranks[0] max_rank = unique_ranks[-1] - # 所有排名都使用[],只有前5名显示红色 if min_rank <= rank_threshold: if min_rank == max_rank: - # 单一排名且在前5 return f"**[{min_rank}]**" else: return f"**[{min_rank} - {max_rank}]**" else: - # 排名在5名之后,使用普通显示 if min_rank == max_rank: return f"[{min_rank}]" else: @@ -757,20 +672,18 @@ class StatisticsCalculator: @staticmethod def _format_time_display(first_time: str, last_time: str) -> str: - """格式化时间显示,单次显示时间,多次显示时间范围""" + """格式化时间显示""" if not first_time: return "" if first_time == last_time or not last_time: - # 只有一个时间点,直接显示 return first_time else: - # 有两个时间点,显示范围 return f"[{first_time} ~ {last_time}]" class ReportGenerator: - """报告生成相关功能""" + """报告生成器""" @staticmethod def generate_html_report( @@ -779,13 +692,7 @@ class ReportGenerator: failed_ids: Optional[List] = None, is_daily: bool = False, ) -> str: - """ - 生成HTML报告,包括失败的请求信息 - - Returns: - HTML文件路径 - """ - # 创建文件路径 + """生成HTML报告""" if is_daily: filename = "当日统计.html" else: @@ -793,23 +700,19 @@ class ReportGenerator: file_path = FileHelper.get_output_path("html", filename) - # HTML模板和内容生成 html_content = ReportGenerator._create_html_content( stats, total_titles, failed_ids, is_daily ) - # 写入文件 with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) - # 如果是当日统计,还需要在根目录下生成index.html + # 当日统计同时生成根目录index.html if is_daily: - root_file_path = "index.html" + root_file_path = Path("index.html") with open(root_file_path, "w", encoding="utf-8") as f: f.write(html_content) - print( - f"当日统计报告已保存到根目录的index.html: {os.path.abspath(root_file_path)}" - ) + print(f"当日统计报告已保存到根目录: {root_file_path.resolve()}") return file_path @@ -820,8 +723,7 @@ class ReportGenerator: failed_ids: Optional[List] = None, is_daily: bool = False, ) -> str: - """创建HTML内容,支持可点击的新闻链接""" - # HTML头部 + """创建HTML内容""" html = """ @@ -862,16 +764,14 @@ class ReportGenerator:

频率词统计报告

""" - # 报告类型 if is_daily: html += "

报告类型: 当日汇总

" - # 基本信息 now = TimeHelper.get_beijing_time() html += f"

总标题数: {total_titles}

" html += f"

生成时间: {now.strftime('%Y-%m-%d %H:%M:%S')}

" - # 失败的请求信息 + # 失败信息 if failed_ids and len(failed_ids) > 0: html += """
@@ -885,7 +785,6 @@ class ReportGenerator:
""" - # 表格头部 html += """ @@ -899,7 +798,6 @@ class ReportGenerator: # 表格内容 for i, stat in enumerate(stats, 1): - # 格式化标题列表用于HTML显示 formatted_titles = [] for title_data in stat["titles"]: title = title_data["title"] @@ -911,25 +809,16 @@ class ReportGenerator: url = title_data.get("url", "") mobile_url = title_data.get("mobileUrl", "") - # 使用HTML格式化排名 - rank_display = StatisticsCalculator._format_rank_for_html( - ranks, rank_threshold - ) + rank_display = StatisticsCalculator._format_rank_for_html(ranks, rank_threshold) - # 优先使用mobileUrl,然后是url,最后无链接 link_url = mobile_url or url - - # 格式化标题信息,添加链接支持 escaped_title = ReportGenerator._html_escape(title) escaped_source_alias = ReportGenerator._html_escape(source_alias) if link_url: - # 转义URL escaped_url = ReportGenerator._html_escape(link_url) - # 有链接时,使用a标签包装标题 formatted_title = f"[{escaped_source_alias}] {escaped_title}" else: - # 没有链接时,使用普通文本 formatted_title = f"[{escaped_source_alias}] {escaped_title}" if rank_display: @@ -953,7 +842,6 @@ class ReportGenerator: """ - # 表格结尾 html += """
@@ -964,7 +852,7 @@ class ReportGenerator: @staticmethod def _html_escape(text: str) -> str: - """HTML转义函数""" + """HTML转义""" if not isinstance(text, str): text = str(text) @@ -980,29 +868,17 @@ class ReportGenerator: failed_ids: Optional[List] = None, report_type: str = "单次爬取", ) -> bool: - """ - 将频率词统计结果发送到飞书 - - Returns: - 成功发送返回True,否则返回False - """ - # 获取webhook URL,优先使用环境变量,其次使用配置中的URL + """发送数据到飞书""" webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) - # 检查webhook URL是否有效 if not webhook_url: - print(f"警告: FEISHU_WEBHOOK_URL未设置或无效,跳过发送飞书通知") + print(f"警告: FEISHU_WEBHOOK_URL未设置,跳过飞书通知") return False headers = {"Content-Type": "application/json"} - - # 获取总标题数 total_titles = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0) - - # 构建文本内容 text_content = ReportGenerator._build_feishu_content(stats, failed_ids) - # 构造消息体 now = TimeHelper.get_beijing_time() payload = { "msg_type": "text", @@ -1014,57 +890,44 @@ class ReportGenerator: }, } - # 发送请求 try: response = requests.post(webhook_url, headers=headers, json=payload) if response.status_code == 200: print(f"数据发送到飞书成功 [{report_type}]") return True else: - print( - f"发送到飞书失败 [{report_type}],状态码:{response.status_code},响应:{response.text}" - ) + print(f"发送到飞书失败 [{report_type}],状态码:{response.status_code},响应:{response.text}") return False except Exception as e: print(f"发送到飞书时出错 [{report_type}]:{e}") return False @staticmethod - def _build_feishu_content( - stats: List[Dict], failed_ids: Optional[List] = None - ) -> str: - """构建飞书消息内容,使用富文本格式和markdown链接,优先使用mobileUrl""" + def _build_feishu_content(stats: List[Dict], failed_ids: Optional[List] = None) -> str: + """构建飞书消息内容""" text_content = "" - - # 添加频率词统计信息 filtered_stats = [stat for stat in stats if stat["count"] > 0] - # 如果有统计数据,添加标题 if filtered_stats: text_content += "📊 **热点词汇统计**\n\n" - # 获取总数用于序号显示 total_count = len(filtered_stats) for i, stat in enumerate(filtered_stats): word = stat["word"] count = stat["count"] - # 构建序号显示,格式为 [当前序号/总数],使用灰色且不加粗 sequence_display = f"[{i + 1}/{total_count}]" - # 关键词加粗,计数和百分比使用不同颜色,序号单独显示为灰色 + # 频次颜色分级 if count >= 10: - # 高频词使用红色 text_content += f"🔥 {sequence_display} **{word}** : {count} 条\n\n" elif count >= 5: - # 中频词使用橙色 text_content += f"📈 {sequence_display} **{word}** : {count} 条\n\n" else: - # 低频词使用默认颜色 text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n" - # 格式化标题列表用于飞书显示 + # 标题列表 for j, title_data in enumerate(stat["titles"], 1): title = title_data["title"] source_alias = title_data["source_alias"] @@ -1075,21 +938,14 @@ class ReportGenerator: url = title_data.get("url", "") mobile_url = title_data.get("mobileUrl", "") - # 使用飞书格式化排名 - rank_display = StatisticsCalculator._format_rank_for_feishu( - ranks, rank_threshold - ) + rank_display = StatisticsCalculator._format_rank_for_feishu(ranks, rank_threshold) - # 格式化标题信息,优先使用mobileUrl,然后是url - link_url = mobile_url or url # 优先使用mobileUrl,没有则使用url + link_url = mobile_url or url if link_url: - # 如果有链接,使用markdown链接格式 formatted_title = f"[{title}]({link_url})" else: - # 如果都没有链接,只显示标题 formatted_title = title - # 构建完整的标题行 text_content += f" {j}. [{source_alias}] {formatted_title}" if rank_display: @@ -1100,18 +956,17 @@ class ReportGenerator: text_content += f" ({count_info}次)" text_content += "\n" - # 在每条新闻后添加额外间隔(除了最后一条) if j < len(stat["titles"]): text_content += "\n" - # 添加分割线,使用更优雅的样式 + # 分割线 if i < len(filtered_stats) - 1: text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" if not text_content: text_content = "📭 暂无匹配的热点词汇\n\n" - # 添加失败平台信息 + # 失败平台信息 if failed_ids and len(failed_ids) > 0: if text_content and "暂无匹配" not in text_content: text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" @@ -1120,7 +975,6 @@ class ReportGenerator: for i, id_value in enumerate(failed_ids, 1): text_content += f" • {id_value}\n" - # 添加底部时间戳 now = TimeHelper.get_beijing_time() text_content += f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" @@ -1128,7 +982,7 @@ class ReportGenerator: class NewsAnalyzer: - """新闻分析主类""" + """新闻分析器""" def __init__( self, @@ -1136,25 +990,16 @@ class NewsAnalyzer: feishu_report_type: str = CONFIG["FEISHU_REPORT_TYPE"], rank_threshold: int = CONFIG["RANK_THRESHOLD"], ): - """ - 初始化新闻分析器 - - Args: - request_interval: 请求间隔(毫秒) - feishu_report_type: 飞书报告类型,可选值: "current"(当前爬取), "daily"(当日汇总), "both"(两者都发送) - rank_threshold: 排名显示阈值 - """ + """初始化分析器""" self.request_interval = request_interval self.feishu_report_type = feishu_report_type self.rank_threshold = rank_threshold - # 判断是否在GitHub Actions环境中 self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true" # 设置代理 self.proxy_url = None if not self.is_github_actions and CONFIG["USE_PROXY"]: - # 本地环境且启用代理时使用代理 self.proxy_url = CONFIG["DEFAULT_PROXY"] print("本地环境,使用代理") elif not self.is_github_actions and not CONFIG["USE_PROXY"]: @@ -1162,77 +1007,55 @@ class NewsAnalyzer: else: print("GitHub Actions环境,不使用代理") - # 初始化数据获取器 self.data_fetcher = DataFetcher(self.proxy_url) def generate_daily_summary(self) -> Optional[str]: - """ - 生成当日统计报告 - - Returns: - HTML文件路径,如果生成失败则返回None - """ + """生成当日统计报告""" print("开始生成当日统计报告...") - # 读取当天所有标题 all_results, id_to_alias, title_info = DataProcessor.read_all_today_titles() if not all_results: print("没有找到当天的数据") return None - # 计算标题总数 total_titles = sum(len(titles) for titles in all_results.values()) print(f"读取到 {total_titles} 个标题") - # 加载频率词和过滤词 word_groups, filter_words = DataProcessor.load_frequency_words() - # 统计词频 stats, total_titles = StatisticsCalculator.count_word_frequency( - all_results, - word_groups, - filter_words, - id_to_alias, - title_info, - self.rank_threshold, + all_results, word_groups, filter_words, + id_to_alias, title_info, self.rank_threshold, ) - # 生成HTML报告 html_file = ReportGenerator.generate_html_report( stats, total_titles, is_daily=True ) print(f"当日HTML统计报告已生成: {html_file}") - # 根据配置决定是否发送当日汇总到飞书 if self.feishu_report_type in ["daily", "both"]: ReportGenerator.send_to_feishu(stats, [], "当日汇总") return html_file def run(self) -> None: - """执行新闻分析流程""" - # 输出当前时间信息 + """执行分析流程""" now = TimeHelper.get_beijing_time() print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") - # 检查FEISHU_WEBHOOK_URL是否存在 webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) if not webhook_url and not CONFIG["CONTINUE_WITHOUT_FEISHU"]: - print( - "错误: FEISHU_WEBHOOK_URL未设置或无效,且CONTINUE_WITHOUT_FEISHU为False,程序退出" - ) + print("错误: FEISHU_WEBHOOK_URL未设置且CONTINUE_WITHOUT_FEISHU为False,程序退出") return if not webhook_url: - print( - "警告: FEISHU_WEBHOOK_URL未设置或无效,将继续执行爬虫但不发送飞书通知" - ) + print("警告: FEISHU_WEBHOOK_URL未设置,将继续执行爬虫但不发送飞书通知") print(f"飞书报告类型: {self.feishu_report_type}") print(f"排名阈值: {self.rank_threshold}") - # 要爬取的网站ID列表 + # 爬取目标列表 ids = [ ("toutiao", "今日头条"), ("baidu", "百度热搜"), @@ -1247,35 +1070,29 @@ class NewsAnalyzer: "zhihu", ] - print(f"开始爬取数据,请求间隔设置为 {self.request_interval} 毫秒") + print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒") - # 确保output目录存在 FileHelper.ensure_directory_exists("output") # 爬取数据 - results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites( - ids, self.request_interval - ) + results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites(ids, self.request_interval) - # 保存标题到文件 + # 保存文件 title_file = DataProcessor.save_titles_to_file(results, id_to_alias, failed_ids) print(f"标题已保存到: {title_file}") - # 从文件名中提取时间信息 - time_info = os.path.basename(title_file).replace(".txt", "") + time_info = Path(title_file).stem - # 创建标题信息字典 + # 创建标题信息 title_info = {} for source_id, titles_data in results.items(): title_info[source_id] = {} for title, title_data in titles_data.items(): - # 兼容新格式数据 if isinstance(title_data, dict): ranks = title_data.get("ranks", []) url = title_data.get("url", "") mobile_url = title_data.get("mobileUrl", "") else: - # 兼容旧格式数据 ranks = title_data if isinstance(title_data, list) else [] url = "" mobile_url = "" @@ -1289,48 +1106,36 @@ class NewsAnalyzer: "mobileUrl": mobile_url, } - # 加载频率词和过滤词 word_groups, filter_words = DataProcessor.load_frequency_words() - # 统计词频 stats, total_titles = StatisticsCalculator.count_word_frequency( - results, - word_groups, - filter_words, - id_to_alias, - title_info, - self.rank_threshold, + results, word_groups, filter_words, + id_to_alias, title_info, self.rank_threshold, ) - # 根据配置决定发送哪种报告 + # 发送报告 if self.feishu_report_type in ["current", "both"]: - # 发送当前爬取数据到飞书 ReportGenerator.send_to_feishu(stats, failed_ids, "单次爬取") - # 生成HTML报告 - html_file = ReportGenerator.generate_html_report( - stats, total_titles, failed_ids - ) + html_file = ReportGenerator.generate_html_report(stats, total_titles, failed_ids) print(f"HTML报告已生成: {html_file}") - # 生成当日统计报告 daily_html = self.generate_daily_summary() - # 在本地环境中自动打开HTML文件 + # 本地环境自动打开HTML if not self.is_github_actions and html_file: - file_url = "file://" + os.path.abspath(html_file) + file_url = "file://" + str(Path(html_file).resolve()) print(f"正在打开HTML报告: {file_url}") webbrowser.open(file_url) if daily_html: - daily_url = "file://" + os.path.abspath(daily_html) + daily_url = "file://" + str(Path(daily_html).resolve()) print(f"正在打开当日统计报告: {daily_url}") webbrowser.open(daily_url) def main(): - """程序入口点""" - # 初始化并运行新闻分析器 + """程序入口""" analyzer = NewsAnalyzer( request_interval=CONFIG["REQUEST_INTERVAL"], feishu_report_type=CONFIG["FEISHU_REPORT_TYPE"],