commit 3f481c107c2c069aa0354a1f31aa6f2f03b9ee0a Author: sansan <77180927+sansan0@users.noreply.github.com> Date: Mon Apr 28 19:44:28 2025 +0800 'init' diff --git a/.github/workflows/crawler.yml b/.github/workflows/crawler.yml new file mode 100644 index 0000000..e44e1cf --- /dev/null +++ b/.github/workflows/crawler.yml @@ -0,0 +1,48 @@ +name: Hot News Crawler + +on: + schedule: + - cron: '*/50 * * * *' # 每50分钟运行一次 + workflow_dispatch: + +# 添加权限设置 +permissions: + contents: write + +jobs: + crawl: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.9' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install requests pytz + + - name: Create frequency_words.txt if not exists + run: | + if [ ! -f frequency_words.txt ]; then + echo "Creating empty frequency_words.txt file" + touch frequency_words.txt + fi + + - name: Run crawler + env: + FEISHU_WEBHOOK_URL: ${{ secrets.FEISHU_WEBHOOK_URL }} + GITHUB_ACTIONS: true + run: python main.py + + - name: Commit and push if changes + run: | + git config --global user.name 'GitHub Actions' + git config --global user.email 'actions@github.com' + git add -A + git diff --quiet && git diff --staged --quiet || (git commit -m "Auto update by GitHub Actions at $(TZ=Asia/Shanghai date)" && git push) diff --git a/frequency_words.txt b/frequency_words.txt new file mode 100644 index 0000000..b1e21f6 --- /dev/null +++ b/frequency_words.txt @@ -0,0 +1,93 @@ +胖东来 +于东来 + +DeepSeek +梁文锋 + +华为 +任正非 +鸿蒙 +HarmonyOS + +比亚迪 +王传福 + +宇树 +王兴兴 + +稚晖君 +智元 + +黑神话 +冯骥 + +哪吒 +饺子 +!车 +!餐 + +流浪地球 +郭帆 +三体 +刘慈欣 + +米哈游 +原神 +星穹铁道 + +京东 +刘强东 + +字节 +张一鸣 + +马斯克 +特斯拉 + +微软 +Microsoft + +黄仁勋 +英伟达 +NVIDIA + +AMD + +谷歌 +google +gemini +deepmind + +chatgpt +openai + +claude +Anthropic + +iphone +ipad +mac +ios + +ai +人工智能 + +汽车 +自动驾驶 +l3 + +机器人 + +芯片 +半导体 +光刻机 + +科技 + +核能 + +月球 +登月 +火星 +宇宙 +飞船 \ No newline at end of file diff --git a/image.png b/image.png new file mode 100644 index 0000000..49c610c Binary files /dev/null and b/image.png differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..c128bad --- /dev/null +++ b/main.py @@ -0,0 +1,1081 @@ +# coding=utf-8 + +import json +import os +import time +import random +from datetime import datetime +import webbrowser +from typing import Dict, List, Tuple, Optional, Union + +import requests +import pytz + +# 配置常量 +CONFIG = { + "FEISHU_SEPARATOR": "==============================", # 飞书消息中,每个频率词之间的分割线,注意,其它类型的分割线可能会被飞书过滤而显示怪异 + "REQUEST_INTERVAL": 1000, # 毫秒 + "FEISHU_REPORT_TYPE": "daily", # 可选: "current", "daily", "both" + "RANK_THRESHOLD": 5, # 排名阈值,决定使用【】还是[]的界限 + "USE_PROXY": False, # 是否启用本地代理 + "DEFAULT_PROXY": "http://127.0.0.1:10086", + "CONTINUE_WITHOUT_FEISHU": False, # 控制是否在没有飞书webhook URL时继续执行爬虫, 如果True ,会依然进行爬虫行为,会在github上持续的生成爬取的新闻数据 + "FEISHU_WEBHOOK_URL": "", # 飞书机器人的webhook URL,大概长这样:https://www.feishu.cn/flow/api/trigger-webhook/xxxx, 默认为空,推荐通过GitHub Secrets设置 +} + + +class TimeHelper: + """时间相关的辅助功能""" + + @staticmethod + def get_beijing_time() -> datetime: + """获取北京时间""" + return datetime.now(pytz.timezone("Asia/Shanghai")) + + @staticmethod + def format_date_folder() -> str: + """返回日期文件夹名称格式""" + return TimeHelper.get_beijing_time().strftime("%Y年%m月%d日") + + @staticmethod + def format_time_filename() -> str: + """返回时间文件名格式""" + return TimeHelper.get_beijing_time().strftime("%H时%M分") + + +class FileHelper: + """文件操作相关的辅助功能""" + + @staticmethod + def ensure_directory_exists(directory: str) -> None: + """确保目录存在,如果不存在则创建""" + if not os.path.exists(directory): + os.makedirs(directory) + + @staticmethod + def get_output_path(subfolder: str, filename: str) -> str: + """获取输出文件路径""" + date_folder = TimeHelper.format_date_folder() + output_dir = os.path.join("output", date_folder, subfolder) + FileHelper.ensure_directory_exists(output_dir) + return os.path.join(output_dir, filename) + + +class DataFetcher: + """数据获取相关功能""" + + def __init__(self, proxy_url: Optional[str] = None): + self.proxy_url = proxy_url + + def fetch_data( + self, + id_info: Union[str, Tuple[str, str]], + max_retries: int = 2, + min_retry_wait: int = 3, + max_retry_wait: int = 5, + ) -> Tuple[Optional[str], str, str]: + """ + 同步获取指定ID的数据,失败时进行重试 + 接受'success'和'cache'两种状态,其他状态才会触发重试 + + Args: + id_info: ID信息,可以是ID字符串或(ID, 别名)元组 + max_retries: 最大重试次数 + min_retry_wait: 最小重试等待时间(秒) + max_retry_wait: 最大重试等待时间(秒) + + Returns: + (响应数据, ID, 别名)元组,如果请求失败则响应数据为None + """ + # 处理ID和别名 + if isinstance(id_info, tuple): + id_value, alias = id_info + else: + id_value = id_info + alias = id_value + + url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" + + # 设置代理 + proxies = None + if self.proxy_url: + proxies = {"http": self.proxy_url, "https": self.proxy_url} + + # 添加随机性模拟真实用户 + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7", + "Connection": "keep-alive", + "Cache-Control": "no-cache", + } + + retries = 0 + while retries <= max_retries: + try: + print( + f"正在请求 {id_value} 数据... (尝试 {retries + 1}/{max_retries + 1})" + ) + response = requests.get( + url, proxies=proxies, headers=headers, timeout=10 + ) + response.raise_for_status() # 检查HTTP状态码 + + # 解析JSON并检查响应状态 + data_text = response.text + data_json = json.loads(data_text) + + # 修改状态检查逻辑:接受success和cache两种状态 + status = data_json.get("status", "未知") + if status not in ["success", "cache"]: + raise ValueError(f"响应状态异常: {status}") + + # 记录状态信息 + status_info = "最新数据" if status == "success" else "缓存数据" + print(f"成功获取 {id_value} 数据({status_info})") + return data_text, id_value, alias + + except Exception as e: + retries += 1 + if retries <= max_retries: + # 计算重试等待时间:基础3-5秒,每次重试增加1-2秒 + base_wait = random.uniform(min_retry_wait, max_retry_wait) + additional_wait = (retries - 1) * random.uniform(1, 2) + wait_time = base_wait + additional_wait + + print( + f"请求 {id_value} 失败: {e}. 将在 {wait_time:.2f} 秒后重试..." + ) + time.sleep(wait_time) + else: + print(f"请求 {id_value} 失败: {e}. 已达到最大重试次数。") + return None, id_value, alias + return None, id_value, alias + + def crawl_websites( + self, + ids_list: List[Union[str, Tuple[str, str]]], + request_interval: int = CONFIG["REQUEST_INTERVAL"], + ) -> Tuple[Dict, Dict, List]: + """ + 爬取多个网站的数据,使用同步请求 + + Args: + ids_list: ID列表,每个元素可以是ID字符串或(ID, 别名)元组 + request_interval: 请求间隔(毫秒) + + Returns: + (results, id_to_alias, failed_ids)元组 + """ + results = {} + id_to_alias = {} + failed_ids = [] + + for i, id_info in enumerate(ids_list): + # 处理ID和别名 + if isinstance(id_info, tuple): + id_value, alias = id_info + else: + id_value = id_info + alias = id_value + + # 添加到ID-别名映射 + id_to_alias[id_value] = alias + + # 发送请求 + response, _, _ = self.fetch_data(id_info) + + # 处理响应 + if response: + try: + data = json.loads(response) + # 获取标题列表,同时记录排名 + results[id_value] = {} + for index, item in enumerate(data.get("items", []), 1): + title = item["title"] + if title in results[id_value]: + results[id_value][title].append(index) + else: + results[id_value][title] = [index] + except json.JSONDecodeError: + print(f"解析 {id_value} 的响应失败,不是有效的JSON") + failed_ids.append(id_value) + except Exception as e: + print(f"处理 {id_value} 数据时出错: {e}") + failed_ids.append(id_value) + else: + failed_ids.append(id_value) + + # 添加间隔时间,除非是最后一个请求 + if i < len(ids_list) - 1: + # 添加一些随机性到间隔时间 + actual_interval = request_interval + random.randint(-10, 20) + actual_interval = max(50, actual_interval) # 确保至少50毫秒 + print(f"等待 {actual_interval} 毫秒后发送下一个请求...") + time.sleep(actual_interval / 1000) + + print(f"\n请求总结:") + print(f"- 成功获取数据的ID: {list(results.keys())}") + print(f"- 请求失败的ID: {failed_ids}") + + return results, id_to_alias, failed_ids + + +class DataProcessor: + """数据处理相关功能""" + + @staticmethod + def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str: + """将标题保存到文件,包括失败的请求信息""" + file_path = FileHelper.get_output_path( + "txt", f"{TimeHelper.format_time_filename()}.txt" + ) + + with open(file_path, "w", encoding="utf-8") as f: + # 先写入成功获取的数据 + for id_value, title_data in results.items(): + display_name = id_to_alias.get(id_value, id_value) + f.write(f"{display_name}\n") + for i, (title, ranks) in enumerate(title_data.items(), 1): + rank_str = ",".join(map(str, ranks)) + f.write(f"{i}. {title} (排名:{rank_str})\n") + f.write("\n") + + # 如果有失败的请求,写入失败信息 + if failed_ids: + f.write("==== 以下ID请求失败 ====\n") + for id_value in failed_ids: + display_name = id_to_alias.get(id_value, id_value) + f.write(f"{display_name} (ID: {id_value})\n") + + return file_path + + @staticmethod + def load_frequency_words( + frequency_file: str = "frequency_words.txt", + ) -> Tuple[List[List[str]], List[str]]: + """ + 加载频率词和过滤词,处理关联词 + + Returns: + (word_groups, filter_words)元组 + """ + if not os.path.exists(frequency_file): + print(f"频率词文件 {frequency_file} 不存在") + return [], [] + + with open(frequency_file, "r", encoding="utf-8") as f: + content = f.read() + + # 按双空行分割不同的词组 + word_groups = [ + group.strip() for group in content.split("\n\n") if group.strip() + ] + + # 处理每个词组 + processed_groups = [] + filter_words = [] # 用于存储过滤词 + + for group in word_groups: + words = [word.strip() for word in group.split("\n") if word.strip()] + + # 分离频率词和过滤词 + group_frequency_words = [] + + for word in words: + if word.startswith("!"): + # 去掉感叹号,添加到过滤词列表 + filter_words.append(word[1:]) + else: + # 正常的频率词 + group_frequency_words.append(word) + + # 只有当词组中包含频率词时才添加到结果中 + if group_frequency_words: + processed_groups.append(group_frequency_words) + + return processed_groups, filter_words + + @staticmethod + def read_all_today_titles() -> Tuple[Dict, Dict, Dict]: + """ + 读取当天所有txt文件的标题,并按来源合并,去除重复,记录时间和出现次数 + + Returns: + (all_results, id_to_alias, title_info)元组 + """ + date_folder = TimeHelper.format_date_folder() + txt_dir = os.path.join("output", date_folder, "txt") + + if not os.path.exists(txt_dir): + print(f"今日文件夹 {txt_dir} 不存在") + return {}, {}, {} + + all_results = {} # 所有源的所有标题 {source_id: {title: [ranks]}} + id_to_alias = {} # ID到别名的映射 + title_info = ( + {} + ) # 标题信息 {source_id: {title: {"first_time": 首次时间, "last_time": 最后时间, "count": 出现次数, "ranks": [排名列表]}}} + + # 读取所有txt文件,按时间排序确保早的时间优先处理 + files = sorted([f for f in os.listdir(txt_dir) if f.endswith(".txt")]) + + for file in files: + # 从文件名提取时间信息 (例如 "12时34分.txt") + time_info = file.replace(".txt", "") + + file_path = os.path.join(txt_dir, file) + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + # 解析内容 + sections = content.split("\n\n") + for section in sections: + if not section.strip() or "==== 以下ID请求失败 ====" in section: + continue + + lines = section.strip().split("\n") + if len(lines) < 2: + continue + + # 第一行是来源名 + source_name = lines[0].strip() + + # 提取标题和排名 + title_ranks = {} + for line in lines[1:]: + if line.strip(): + try: + # 提取序号和正文部分 + match_num = None + title_part = line.strip() + + # 处理格式 "数字. 标题" + if ( + ". " in title_part + and title_part.split(". ")[0].isdigit() + ): + parts = title_part.split(". ", 1) + match_num = int(parts[0]) # 序号可能是排名 + title_part = parts[1] + + # 提取排名信息 "标题 (排名:1,2,3)" + ranks = [] + if " (排名:" in title_part: + title, rank_str = title_part.rsplit(" (排名:", 1) + rank_str = rank_str.rstrip(")") + ranks = [ + int(r) + for r in rank_str.split(",") + if r.strip() and r.isdigit() + ] + else: + title = title_part + + # 如果没找到排名但有序号,则使用序号 + if not ranks and match_num is not None: + ranks = [match_num] + + # 确保排名列表不为空 + if not ranks: + ranks = [99] # 默认排名 + + title_ranks[title] = ranks + + except Exception as e: + print(f"解析标题行出错: {line}, 错误: {e}") + + # 处理来源数据 + DataProcessor._process_source_data( + source_name, + title_ranks, + time_info, + all_results, + title_info, + id_to_alias, + ) + + # 将结果从 {source_name: {title: [ranks]}} 转换为 {source_id: {title: [ranks]}} + id_results = {} + id_title_info = {} + for name, titles in all_results.items(): + for id_value, alias in id_to_alias.items(): + if alias == name: + id_results[id_value] = titles + id_title_info[id_value] = title_info[name] + break + + return id_results, id_to_alias, id_title_info + + @staticmethod + def _process_source_data( + source_name: str, + title_ranks: Dict, + time_info: str, + all_results: Dict, + title_info: Dict, + id_to_alias: Dict, + ) -> None: + """处理来源数据,更新结果和标题信息""" + if source_name not in all_results: + # 首次遇到此来源 + all_results[source_name] = title_ranks + + # 初始化标题信息 + if source_name not in title_info: + title_info[source_name] = {} + + # 记录每个标题的时间、次数和排名 + for title, ranks in title_ranks.items(): + title_info[source_name][title] = { + "first_time": time_info, # 记录首次时间 + "last_time": time_info, # 最后时间初始同首次时间 + "count": 1, + "ranks": ranks, + } + + # 尝试反向生成ID + reversed_id = source_name.lower().replace(" ", "-") + id_to_alias[reversed_id] = source_name + else: + # 已有此来源,更新标题 + for title, ranks in title_ranks.items(): + if title not in all_results[source_name]: + all_results[source_name][title] = ranks + title_info[source_name][title] = { + "first_time": time_info, # 新标题的首次和最后时间都设为当前 + "last_time": time_info, + "count": 1, + "ranks": ranks, + } + else: + # 已存在的标题,更新最后时间,合并排名信息并增加计数 + existing_ranks = title_info[source_name][title]["ranks"] + merged_ranks = existing_ranks.copy() + for rank in ranks: + if rank not in merged_ranks: + merged_ranks.append(rank) + + title_info[source_name][title][ + "last_time" + ] = time_info # 更新最后时间 + title_info[source_name][title]["ranks"] = merged_ranks + title_info[source_name][title]["count"] += 1 + + +class StatisticsCalculator: + """统计计算相关功能""" + + @staticmethod + def count_word_frequency( + results: Dict, + word_groups: List[List[str]], + filter_words: List[str], + id_to_alias: Dict, + title_info: Optional[Dict] = None, + rank_threshold: int = CONFIG["RANK_THRESHOLD"], + ) -> Tuple[List[Dict], int]: + """ + 统计词频,处理关联词和大小写不敏感,每个标题只计入首个匹配词组,并应用过滤词 + + Returns: + (stats, total_titles)元组 + """ + word_stats = {} + total_titles = 0 + processed_titles = {} # 用于跟踪已处理标题 {source_id: {title: True}} + + # 初始化title_info + if title_info is None: + title_info = {} + + # 为每个词组创建统计对象 + for group in word_groups: + group_key = " ".join(group) + word_stats[group_key] = {"count": 0, "titles": {}} + + # 遍历所有标题并统计 + for source_id, titles_data in results.items(): + total_titles += len(titles_data) + + # 初始化该来源的处理记录 + if source_id not in processed_titles: + processed_titles[source_id] = {} + + for title, source_ranks in titles_data.items(): + # 跳过已处理的标题 + if title in processed_titles.get(source_id, {}): + continue + + title_lower = title.lower() # 转换为小写以实现大小写不敏感 + + # 检查是否包含任何过滤词 + contains_filter_word = any( + filter_word.lower() in title_lower for filter_word in filter_words + ) + + # 如果包含过滤词,跳过这个标题 + if contains_filter_word: + continue + + # 按顺序检查每个词组 + for group in word_groups: + group_key = " ".join(group) + + # 检查是否有任何一个词在标题中 + matched = any(word.lower() in title_lower for word in group) + + # 如果匹配,增加计数并添加标题,然后标记为已处理 + if matched: + word_stats[group_key]["count"] += 1 + if source_id not in word_stats[group_key]["titles"]: + word_stats[group_key]["titles"][source_id] = [] + + # 获取标题信息 + first_time = "" + last_time = "" + count_info = 1 + ranks = source_ranks if source_ranks else [] + + if ( + title_info + and source_id in title_info + and title in title_info[source_id] + ): + info = title_info[source_id][title] + first_time = info.get("first_time", "") + last_time = info.get("last_time", "") + count_info = info.get("count", 1) + if "ranks" in info and info["ranks"]: + ranks = info["ranks"] + + # 添加带信息的标题 + word_stats[group_key]["titles"][source_id].append( + { + "title": title, + "first_time": first_time, + "last_time": last_time, + "count": count_info, + "ranks": ranks, + } + ) + + # 标记该标题已处理,不再匹配其他词组 + if source_id not in processed_titles: + processed_titles[source_id] = {} + processed_titles[source_id][title] = True + break # 找到第一个匹配的词组后退出循环 + + # 转换统计结果 + stats = [] + for group_key, data in word_stats.items(): + titles_with_info = [] + for source_id, title_list in data["titles"].items(): + source_alias = id_to_alias.get(source_id, source_id) + for title_data in title_list: + title = title_data["title"] + first_time = title_data["first_time"] + last_time = title_data["last_time"] + count_info = title_data["count"] + ranks = title_data.get("ranks", []) + + # 确保排名是有效的 + if not ranks: + ranks = [99] # 使用默认排名 + + # 格式化排名信息 + rank_display = StatisticsCalculator._format_rank_display( + ranks, rank_threshold + ) + + # 格式化时间信息 + time_display = StatisticsCalculator._format_time_display( + first_time, last_time + ) + + # 格式化标题信息 + formatted_title = f"[{source_alias}] {title}" + if rank_display: + formatted_title += f" {rank_display}" + if time_display: + formatted_title += f" - {time_display}" + if count_info > 1: + formatted_title += f" - {count_info}次" + + titles_with_info.append(formatted_title) + + stats.append( + { + "word": group_key, + "count": data["count"], + "titles": titles_with_info, + "percentage": ( + round(data["count"] / total_titles * 100, 2) + if total_titles > 0 + else 0 + ), + } + ) + + # 按出现次数从高到低排序 + stats.sort(key=lambda x: x["count"], reverse=True) + + return stats, total_titles + + @staticmethod + def _format_rank_display(ranks: List[int], rank_threshold: int) -> str: + """格式化排名显示""" + if not ranks: + return "" + + # 排序排名并确保不重复 + unique_ranks = sorted(set(ranks)) + min_rank = unique_ranks[0] + max_rank = unique_ranks[-1] + + # 根据最高排名判断使用哪种括号 + if min_rank <= rank_threshold: + # 使用【】 + if min_rank == max_rank: + return f"【{min_rank}】" + else: + return f"【{min_rank} - {max_rank}】" + else: + # 使用[] + if min_rank == max_rank: + return f"[{min_rank}]" + else: + return f"[{min_rank} - {max_rank}]" + + @staticmethod + def _format_time_display(first_time: str, last_time: str) -> str: + """格式化时间显示,单次显示时间,多次显示时间范围""" + if not first_time: + return "" + + if first_time == last_time or not last_time: + # 只有一个时间点,直接显示 + return first_time + else: + # 有两个时间点,显示范围 + return f"[{first_time} ~ {last_time}]" + + +class ReportGenerator: + """报告生成相关功能""" + + @staticmethod + def generate_html_report( + stats: List[Dict], + total_titles: int, + failed_ids: Optional[List] = None, + is_daily: bool = False, + ) -> str: + """ + 生成HTML报告,包括失败的请求信息 + + Returns: + HTML文件路径 + """ + # 创建文件路径 + if is_daily: + filename = "当日统计.html" + else: + filename = f"{TimeHelper.format_time_filename()}.html" + + file_path = FileHelper.get_output_path("html", filename) + + # HTML模板和内容生成 + html_content = ReportGenerator._create_html_content( + stats, total_titles, failed_ids, is_daily + ) + + # 写入文件 + with open(file_path, "w", encoding="utf-8") as f: + f.write(html_content) + + return file_path + + @staticmethod + def _create_html_content( + stats: List[Dict], + total_titles: int, + failed_ids: Optional[List] = None, + is_daily: bool = False, + ) -> str: + """创建HTML内容""" + # HTML头部 + html = """ + + + + + 频率词统计报告 + + + +

频率词统计报告

+ """ + + # 报告类型 + if is_daily: + html += "

报告类型: 当日汇总

" + + # 基本信息 + now = TimeHelper.get_beijing_time() + html += f"

总标题数: {total_titles}

" + html += f"

生成时间: {now.strftime('%Y-%m-%d %H:%M:%S')}

" + + # 失败的请求信息 + if failed_ids and len(failed_ids) > 0: + html += """ +
+

请求失败的平台

+ +
+ """ + + # 表格头部 + html += """ + + + + + + + + + """ + + # 表格内容 + for i, stat in enumerate(stats, 1): + html += f""" + + + + + + + + """ + + # 表格结尾 + html += """ +
排名频率词出现次数占比相关标题
{i}{stat['word']}{stat['count']}{stat['percentage']}%{"
".join(stat['titles'])}
+ + + """ + + return html + + @staticmethod + def send_to_feishu( + stats: List[Dict], + failed_ids: Optional[List] = None, + report_type: str = "单次爬取", + ) -> bool: + """ + 将频率词统计结果发送到飞书 + + Returns: + 成功发送返回True,否则返回False + """ + # 获取webhook URL,优先使用环境变量,其次使用配置中的URL + webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) + + # 检查webhook URL是否有效 + if not webhook_url: + print(f"警告: FEISHU_WEBHOOK_URL未设置或无效,跳过发送飞书通知") + return False + + headers = {"Content-Type": "application/json"} + + # 获取总标题数 + total_titles = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0) + + # 构建文本内容 + text_content = ReportGenerator._build_feishu_content(stats, failed_ids) + + # 构造消息体 + now = TimeHelper.get_beijing_time() + payload = { + "msg_type": "text", + "content": { + "total_titles": total_titles, + "timestamp": now.strftime("%Y-%m-%d %H:%M:%S"), + "report_type": report_type, + "text": text_content, + }, + } + + # 发送请求 + try: + response = requests.post(webhook_url, headers=headers, json=payload) + if response.status_code == 200: + print(f"数据发送到飞书成功 [{report_type}]") + return True + else: + print( + f"发送到飞书失败 [{report_type}],状态码:{response.status_code},响应:{response.text}" + ) + return False + except Exception as e: + print(f"发送到飞书时出错 [{report_type}]:{e}") + return False + + @staticmethod + def _build_feishu_content( + stats: List[Dict], failed_ids: Optional[List] = None + ) -> str: + """构建飞书消息内容""" + text_content = "" + + # 添加频率词统计信息 + filtered_stats = [stat for stat in stats if stat["count"] > 0] + for i, stat in enumerate(filtered_stats): + word = stat["word"] + count = stat["count"] + + text_content += f"【{word}】 : {count} 条\n" + + # 添加相关标题 + for j, title in enumerate(stat["titles"], 1): + text_content += f"{j}. {title}\n" + + # 添加分割线 + if i < len(filtered_stats) - 1: + text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" + + if not text_content: + text_content = "无匹配频率词\n\n" + + # 添加失败平台信息 + if failed_ids and len(failed_ids) > 0: + if text_content and text_content != "无匹配频率词\n\n": + text_content += f"\n{CONFIG['FEISHU_SEPARATOR']}\n\n" + + text_content += "失败平台:\n" + for i, id_value in enumerate(failed_ids, 1): + text_content += f"{i}. {id_value}\n" + + return text_content + + +class NewsAnalyzer: + """新闻分析主类""" + + def __init__( + self, + request_interval: int = CONFIG["REQUEST_INTERVAL"], + feishu_report_type: str = CONFIG["FEISHU_REPORT_TYPE"], + rank_threshold: int = CONFIG["RANK_THRESHOLD"], + ): + """ + 初始化新闻分析器 + + Args: + request_interval: 请求间隔(毫秒) + feishu_report_type: 飞书报告类型,可选值: "current"(当前爬取), "daily"(当日汇总), "both"(两者都发送) + rank_threshold: 排名显示阈值 + """ + self.request_interval = request_interval + self.feishu_report_type = feishu_report_type + self.rank_threshold = rank_threshold + + # 判断是否在GitHub Actions环境中 + self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true" + + # 设置代理 + self.proxy_url = None + if not self.is_github_actions and CONFIG["USE_PROXY"]: + # 本地环境且启用代理时使用代理 + self.proxy_url = CONFIG["DEFAULT_PROXY"] + print("本地环境,使用代理") + elif not self.is_github_actions and not CONFIG["USE_PROXY"]: + print("本地环境,未启用代理") + else: + print("GitHub Actions环境,不使用代理") + + # 初始化数据获取器 + self.data_fetcher = DataFetcher(self.proxy_url) + + def generate_daily_summary(self) -> Optional[str]: + """ + 生成当日统计报告 + + Returns: + HTML文件路径,如果生成失败则返回None + """ + print("开始生成当日统计报告...") + + # 读取当天所有标题 + all_results, id_to_alias, title_info = DataProcessor.read_all_today_titles() + + if not all_results: + print("没有找到当天的数据") + return None + + # 计算标题总数 + total_titles = sum(len(titles) for titles in all_results.values()) + print(f"读取到 {total_titles} 个标题") + + # 加载频率词和过滤词 + word_groups, filter_words = DataProcessor.load_frequency_words() + + # 统计词频 + stats, total_titles = StatisticsCalculator.count_word_frequency( + all_results, + word_groups, + filter_words, + id_to_alias, + title_info, + self.rank_threshold, + ) + + # 生成HTML报告 + html_file = ReportGenerator.generate_html_report( + stats, total_titles, is_daily=True + ) + print(f"当日HTML统计报告已生成: {html_file}") + + # 根据配置决定是否发送当日汇总到飞书 + if self.feishu_report_type in ["daily", "both"]: + ReportGenerator.send_to_feishu(stats, [], "当日汇总") + + return html_file + + def run(self) -> None: + """执行新闻分析流程""" + # 输出当前时间信息 + now = TimeHelper.get_beijing_time() + print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") + + # 检查FEISHU_WEBHOOK_URL是否存在 + webhook_url = os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]) + if not webhook_url and not CONFIG["CONTINUE_WITHOUT_FEISHU"]: + print( + "错误: FEISHU_WEBHOOK_URL未设置或无效,且CONTINUE_WITHOUT_FEISHU为False,程序退出" + ) + return + + if not webhook_url: + print( + "警告: FEISHU_WEBHOOK_URL未设置或无效,将继续执行爬虫但不发送飞书通知" + ) + + print(f"飞书报告类型: {self.feishu_report_type}") + print(f"排名阈值: {self.rank_threshold}") + + # 要爬取的网站ID列表 + ids = [ + ("toutiao", "今日头条"), + ("baidu", "百度热搜"), + ("wallstreetcn-hot", "华尔街见闻"), + ("thepaper", "澎湃新闻"), + ("bilibili-hot-search", "bilibili 热搜"), + ("cls-hot", "财联社热门"), + "tieba", + "weibo", + "douyin", + "zhihu", + ] + + print(f"开始爬取数据,请求间隔设置为 {self.request_interval} 毫秒") + + # 确保output目录存在 + FileHelper.ensure_directory_exists("output") + + # 爬取数据 + results, id_to_alias, failed_ids = self.data_fetcher.crawl_websites( + ids, self.request_interval + ) + + # 保存标题到文件 + title_file = DataProcessor.save_titles_to_file(results, id_to_alias, failed_ids) + print(f"标题已保存到: {title_file}") + + # 从文件名中提取时间信息 + time_info = os.path.basename(title_file).replace(".txt", "") + + # 创建标题信息字典 + title_info = {} + for source_id, titles_data in results.items(): + title_info[source_id] = {} + for title, ranks in titles_data.items(): + title_info[source_id][title] = { + "first_time": time_info, + "last_time": time_info, + "count": 1, + "ranks": ranks, + } + + # 加载频率词和过滤词 + word_groups, filter_words = DataProcessor.load_frequency_words() + + # 统计词频 + stats, total_titles = StatisticsCalculator.count_word_frequency( + results, + word_groups, + filter_words, + id_to_alias, + title_info, + self.rank_threshold, + ) + + # 根据配置决定发送哪种报告 + if self.feishu_report_type in ["current", "both"]: + # 发送当前爬取数据到飞书 + ReportGenerator.send_to_feishu(stats, failed_ids, "单次爬取") + + # 生成HTML报告 + html_file = ReportGenerator.generate_html_report( + stats, total_titles, failed_ids + ) + print(f"HTML报告已生成: {html_file}") + + # 生成当日统计报告 + daily_html = self.generate_daily_summary() + + # 在本地环境中自动打开HTML文件 + if not self.is_github_actions and html_file: + file_url = "file://" + os.path.abspath(html_file) + print(f"正在打开HTML报告: {file_url}") + webbrowser.open(file_url) + + if daily_html: + daily_url = "file://" + os.path.abspath(daily_html) + print(f"正在打开当日统计报告: {daily_url}") + webbrowser.open(daily_url) + + +def main(): + """程序入口点""" + # 初始化并运行新闻分析器 + analyzer = NewsAnalyzer( + request_interval=CONFIG["REQUEST_INTERVAL"], + feishu_report_type=CONFIG["FEISHU_REPORT_TYPE"], + rank_threshold=CONFIG["RANK_THRESHOLD"], + ) + analyzer.run() + + +if __name__ == "__main__": + main() diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..c00165b --- /dev/null +++ b/readme.md @@ -0,0 +1,399 @@ +# TrendRadar - 多平台热点资讯监控分析系统 + +TrendRadar 是一款多平台热点资讯监控工具,可自动追踪主流媒体平台的热门话题,实时分析热点走势,根据自定义关键词进行筛选,并通过精美报表或飞书机器人实时推送到手机上。无论你是媒体从业者、市场分析师、还是信息爱好者,TrendRadar 都能帮你第一时间捕捉全网热点脉搏。 + +或者像我一样通过这个工具来反向减少对各种APP的使用依赖的。 + + +## ✨ 核心功能 + +- **多平台覆盖** - 一次监控 10+主流平台(今日头条、百度热搜、微博、抖音、知乎、B 站等) +- **智能分析** - 自定义频率词和过滤词,精准捕捉你关心的热点 +- **数据可视化** - 生成美观的 HTML 统计报告,热点一目了然 +- **实时推送** - 支持飞书机器人通知,重要热点即时知晓 +- **全自动化** - 基于 GitHub Actions,定时运行无需服务器 + +## 🔍 支持的平台 + +目前已支持以下 10 个热门平台: + +- 今日头条 +- 百度热搜 +- 华尔街见闻 +- 澎湃新闻 +- bilibili 热搜 +- 财联社热门 +- 贴吧 +- 微博 +- 抖音 +- 知乎 + + + + +## 🚀 使用方式 + +### 方式一:GitHub Actions 远程运行(推荐) + +1. **Fork 本项目**到你的 GitHub 账户 + + - 点击本页面右上角的"Fork"按钮 + +2. **设置 GitHub Secrets**: + + - 在你 Fork 后的仓库中,进入`Settings` > `Secrets and variables` > `Actions` + - 点击"New repository secret" + - 名称填写`FEISHU_WEBHOOK_URL` + - 值填写你的飞书机器人 Webhook 地址(webhook 获取,请直接跳转到下方的 "🤖 飞书机器人设置") + - 点击"Add secret"保存 + +3. **自定义关键词**: + + - 修改`frequency_words.txt`文件,添加你需要监控的频率词和过滤词 + +4. **自动运行**: + + - 项目已包含`.github/workflows/crawler.yml`配置文件,默认每 50 分钟自动运行一次 + - 你也可以在 GitHub 仓库的 Actions 页面手动触发运行 + +5. **查看结果**: + - 运行结果将自动保存在仓库的`output`目录中 + - 同时通过飞书机器人发送通知到你的群组 + +6. **增加或减少平台**: +如果想支持更多平台或者不想看某些歪屁股平台,可以访问newsnow的源代码:https://github.com/ourongxing/newsnow/tree/main/server/sources ,根据里面的文件名自己来修改 main.py 中的下面代码,可以在你 Fork 的项目上直接修改源码 + +``` + ids = [ + ("toutiao", "今日头条"), + ("baidu", "百度热搜"), + ("wallstreetcn-hot", "华尔街见闻"), + ("thepaper", "澎湃新闻"), + ("bilibili-hot-search", "bilibili 热搜"), + ("cls-hot", "财联社热门"), + "tieba", + "weibo", + "douyin", + "zhihu", + ] +``` + +### 方式二:本地运行 + +1. **克隆项目**到本地: + +```bash +git clone https://github.com/sansan0/TrendRadar.git +cd TrendRadar +``` + +2. **安装依赖**: + +```bash +pip install requests pytz +``` + +3. **配置飞书 Webhook URL**(两种方式): + + - 方式 1:直接在代码顶部的`CONFIG`字典中修改`FEISHU_WEBHOOK_URL`的值 + - 方式 2:设置环境变量`FEISHU_WEBHOOK_URL`(优先级更高) + +4. **创建或修改关键词**: + + - 编辑`frequency_words.txt`文件,添加你需要监控的频率词和过滤词 + +5. **运行程序**: + +```bash +python main.py +``` + +程序将自动爬取热点数据,生成报告,并在本地浏览器中打开 HTML 统计页面。 + +## ⚙️ 配置说明 + +### 全局配置项 + +代码顶部的`CONFIG`字典包含了所有可配置的选项: + +```python +CONFIG = { + "FEISHU_SEPARATOR": "==============================", # 飞书消息分割线 + "REQUEST_INTERVAL": 1000, # 请求间隔(毫秒) + "FEISHU_REPORT_TYPE": "daily", # 可选: "current", "daily", "both" + "RANK_THRESHOLD": 5, # 排名阈值,决定使用【】还是[]的界限 + "USE_PROXY": False, # 是否启用本地代理 + "DEFAULT_PROXY": "http://127.0.0.1:10086", # 默认代理地址 + "CONTINUE_WITHOUT_FEISHU": False, # 是否在没有飞书webhook URL时继续执行爬虫 + "FEISHU_WEBHOOK_URL": "" # 飞书机器人的webhook URL,默认为空 +} +``` + +主要配置项说明: + +- `REQUEST_INTERVAL`: 控制爬取不同平台之间的时间间隔 +- `FEISHU_REPORT_TYPE`: 控制发送到飞书的报告类型 + - `current`: 只发送当前爬取结果 + - `daily`: 只发送当日汇总 + - `both`: 两者都发送 +- `RANK_THRESHOLD`: 排名显示阈值,小于等于此值的排名使用【】,大于此值使用[] +- `USE_PROXY`: 是否在本地运行时使用代理 +- `DEFAULT_PROXY`: 本地代理地址 +- `CONTINUE_WITHOUT_FEISHU`: 如果为`True`,即使没有飞书 webhook URL 也会执行爬虫;如果为`False`,则不执行 +- `FEISHU_WEBHOOK_URL`: 飞书机器人的 webhook URL,可以直接在此设置 + +### 频率词和过滤词 + +在`frequency_words.txt`文件中配置监控的频率词和过滤词: + +- 每组相关的频率词用换行分隔,不同组之间用空行分隔 +- 以`!`开头的词为过滤词 +- 如果一个标题既包含频率词又包含过滤词,则该标题不会被统计 + +示例: + +``` +人工智能 +AI +GPT +大模型 +!AI绘画 + +芯片 +半导体 +``` + +上述配置表示: + +- 监控包含"人工智能"、"AI"、"GPT"或"大模型"的标题,但若同时包含"AI 绘画"则排除 +- 监控包含"芯片"或"半导体"的标题 + +## 📊 输出示例 + +程序会生成两种报告: + +1. **单次爬取报告**:每次爬取后生成的报告,包含当次爬取的热点数据 +2. **当日汇总报告**:汇总当天所有爬取的数据,去重并统计出现频率 + +### HTML 报告示例: + +| 排名 | 频率词 | 出现次数 | 占比 | 相关标题 | +| ---- | ----------- | -------- | ----- | ------------------------------------------------------------------------------------------------------------------- | +| 1 | 人工智能 AI | 12 | 24.5% | [百度热搜] 科技巨头发布新 AI 模型 【1】- 12 时 30 分 - 4 次
[今日头条] AI 技术最新突破 【2】- 13 时 15 分 - 2 次 | +| 2 | 芯片 半导体 | 8 | 16.3% | [华尔街见闻] 半导体行业最新动态 【3】- 12 时 45 分 - 3 次
[财联社] 芯片设计新技术 [7] - 14 时 00 分 - 1 次 | + +### 飞书通知示例: + +``` +【人工智能 AI】 : 12 条 +1. [百度热搜] 科技巨头发布新AI模型 【1】- 12时30分 - 4次 +2. [今日头条] AI技术最新突破 【2】- 13时15分 - 2次 + +============================== + +【芯片 半导体】 : 8 条 +1. [华尔街见闻] 半导体行业最新动态 【3】- 12时45分 - 3次 +2. [财联社] 芯片设计新技术 [7] - 14时00分 - 1次 +``` + +### 飞书消息格式说明 + +| 格式元素 | 示例 | 含义 | 说明 | +| ------------- | ------------------------------ | ------------ | ----------------------------------- | +| 【关键词】 | 【人工智能 AI】 | 频率词组 | 表示本组匹配的关键词 | +| : N 条 | : 12 条 | 匹配数量 | 该关键词组匹配的标题总数 | +| [平台名] | [百度热搜] | 来源平台 | 标题所属的平台名称 | +| 【数字】 | 【1】 | 高排名标记 | 排名 ≤ 阈值(默认 5)的热搜,重要性高 | +| [数字] | [7] | 普通排名标记 | 排名>阈值的热搜,重要性一般 | +| - 时间 | - 12 时 30 分 | 首次发现时间 | 标题首次被发现的时间 | +| [时间 ~ 时间] | [12 时 30 分 ~ 14 时 00 分] | 时间范围 | 标题出现的时间范围(首次~最后) | +| - N 次 | - 4 次 | 出现次数 | 标题在监控期间出现的总次数 | +| ====== | ============================== | 分隔线 | 不同频率词组之间的分隔符 | + +## 🤖 飞书机器人设置 + +1. 电脑浏览器打开 https://botbuilder.feishu.cn/home/my-app + +2. 点击"新建机器人应用" + +3. 进入创建的应用后,点击"流程涉及" > "创建流程" > "选择触发器" + +4. 往下滑动,点击"Webhook 触发" + +5. 此时你会看到"Webhook 地址",把这个链接先复制到本地记事本暂存,继续接下来的操作 + +6. "参数"里面放上下面的内容,然后点击"完成" + +``` +{ +"message_type ":"text", +"content":{ + "total_titles": "{{内容}}", + "timestamp": "{{内容}}", + "report_type": "{{内容}}", + "text": "{{内容}}" +} +} +``` + +7. 点击"选择操作" > "发送飞书消息" ,勾选 "群消息", 然后点击下面的输入框,点击"我管理的群组"(如果没有群组,你可以在飞书 app 上创建群组) + +8. 消息标题填写"我是热搜" + +9. 最关键的部分来了,点击 + 按钮,选择"Webhook 触发",然后按照下面的图片摆放 + +![alt text](image.png) + +```bash +# Linux/macOS +export FEISHU_WEBHOOK_URL="你的Webhook URL" + +# Windows +set FEISHU_WEBHOOK_URL="你的Webhook URL" +``` + +## 📡 数据来源说明 + +本项目使用的数据来自 [newsnow](https://github.com/ourongxing/newsnow) 的 API 服务。每个平台的数据通过以下格式的 API 请求获取: + +``` +https://newsnow.busiyi.world/api/s?id={平台ID}&latest +``` + +其中`{平台ID}`为各平台的标识符,如`baidu`、`toutiao`等。 + +API 返回的数据格式为 JSON,包含平台的热搜榜单: + +```json +{ + "status": "success", + "items": [ + { + "title": "热搜标题1", + "url": "https://example.com/news1" + }, + { + "title": "热搜标题2", + "url": "https://example.com/news2" + } + // ...更多条目 + ] +} +``` + +### 自建 API 服务 + +如果你想自己部署 API 服务而不依赖第三方: + +1. 克隆 [newsnow](https://github.com/ourongxing/newsnow) 仓库 + + ```bash + git clone https://github.com/ourongxing/newsnow.git + cd newsnow + ``` + +2. 按照该仓库的 README 说明部署 API 服务 + +3. 修改 TrendRadar 中的 API URL: + + - 在`DataFetcher.fetch_data`方法中,将 + ```python + url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" + ``` + 更改为你自己的 API 地址 + ```python + url = f"https://你的域名/api/s?id={id_value}&latest" + ``` + +4. 如需添加新的平台支持,请参考 newsnow 项目中的爬虫实现并添加到你的 API 服务中 + +## 📁 代码结构 + +代码采用了面向对象设计模式,主要包含以下几个类: + +- `TimeHelper`: 时间相关的辅助功能 +- `FileHelper`: 文件操作相关的辅助功能 +- `DataFetcher`: 负责从 API 获取数据 +- `DataProcessor`: 负责处理和转换数据 +- `StatisticsCalculator`: 负责统计计算 +- `ReportGenerator`: 负责生成 HTML 报告和飞书消息 +- `NewsAnalyzer`: 主类,协调整个流程的执行 + +## 🔧 高级用法 + +### 自定义监控平台 + +可以在`NewsAnalyzer.run`方法中修改`ids`列表来添加或移除监控的平台: + +```python +ids = [ + ("toutiao", '今日头条'), + ("baidu", '百度热搜'), + # 添加或移除平台 +] +``` + +### 飞书通知选项 + +你可以通过以下方式控制飞书通知行为: + +1. `FEISHU_WEBHOOK_URL`: 设置为有效的 webhook URL 以启用飞书通知 +2. `CONTINUE_WITHOUT_FEISHU`: 控制在没有有效 webhook URL 时的行为 + - `True`: 执行爬虫但不发送通知(默认值) + - `False`: 完全不执行爬虫 +3. `FEISHU_REPORT_TYPE`: 控制发送哪种类型的报告 + +### 扩展功能 + +如果你想扩展功能,可以: + +1. 继承已有类并重写特定方法 +2. 添加新的统计方法到`StatisticsCalculator`类 +3. 添加新的报告格式到`ReportGenerator`类 +4. 修改`NewsAnalyzer`类以支持新的工作流程 + +## ❓ 常见问题 + +1. **GitHub Actions 不执行怎么办?** + + - 检查`.github/workflows/crawler.yml`文件是否存在 + - 在 Actions 页面手动触发一次 workflow + - 确认你有足够的 GitHub Actions 免费分钟数 + +2. **本地运行失败怎么办?** + + - 检查网络连接 + - 尝试修改`CONFIG`中的`USE_PROXY`和`DEFAULT_PROXY`设置 + - 检查依赖是否正确安装 + +3. **没有收到飞书通知怎么办?** + + - 检查`FEISHU_WEBHOOK_URL`是否正确设置(环境变量或 CONFIG 中) + - 检查飞书机器人是否仍在群内且启用 + - 查看程序输出中是否有发送失败的错误信息 + +4. **想要停止爬虫行为但保留仓库怎么办?** + + - 将`CONTINUE_WITHOUT_FEISHU`设置为`False`并删除`FEISHU_WEBHOOK_URL`secret + - 或修改 GitHub Actions workflow 文件禁用自动执行 + +5. **如何处理 API 限制或访问问题?** + - 适当增加`REQUEST_INTERVAL`值,避免频繁请求 + - 考虑使用上述"自建 API 服务"部分的说明部署自己的服务 + - 本地运行时可尝试启用或更换代理 + +## 💡 应用场景 + +- **媒体从业者**: 实时追踪热点,把握报道方向 +- **市场营销**: 及时发现与品牌相关的热点话题 +- **内容创作**: 获取热门话题灵感,提高内容曝光 +- **投资分析**: 追踪特定行业或公司的热点消息 +- **个人使用**: 不错过任何你关心领域的热点信息 + +## 🙏 致谢 + +本项目使用了 [newsnow](https://github.com/ourongxing/newsnow) 提供的 API 服务,感谢其提供的数据支持。 + +## 📄 许可证 + +MIT License \ No newline at end of file