This commit is contained in:
sansan 2025-07-09 21:38:17 +08:00 committed by GitHub
parent 39b9c9a243
commit 0aafa8c3e2

299
main.py
View File

@ -3,6 +3,7 @@
import json import json
import time import time
import random import random
import re
from datetime import datetime from datetime import datetime
import webbrowser import webbrowser
from typing import Dict, List, Tuple, Optional, Union from typing import Dict, List, Tuple, Optional, Union
@ -13,7 +14,7 @@ import requests
import pytz import pytz
CONFIG = { CONFIG = {
"VERSION": "1.3.0", "VERSION": "1.4.1",
"VERSION_CHECK_URL": "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version", "VERSION_CHECK_URL": "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version",
"SHOW_VERSION_UPDATE": True, # 控制显示版本更新提示,改成 False 将不接受新版本提示 "SHOW_VERSION_UPDATE": True, # 控制显示版本更新提示,改成 False 将不接受新版本提示
"FEISHU_MESSAGE_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # feishu消息分割线 "FEISHU_MESSAGE_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # feishu消息分割线
@ -24,6 +25,10 @@ CONFIG = {
"DEFAULT_PROXY": "http://127.0.0.1:10086", "DEFAULT_PROXY": "http://127.0.0.1:10086",
"ENABLE_CRAWLER": True, # 是否启用爬取新闻功能False时直接停止程序 "ENABLE_CRAWLER": True, # 是否启用爬取新闻功能False时直接停止程序
"ENABLE_NOTIFICATION": True, # 是否启用通知功能False时不发送手机通知 "ENABLE_NOTIFICATION": True, # 是否启用通知功能False时不发送手机通知
"FOCUS_NEW_ONLY": False, # 是否只关注新增新闻True时只统计和推送新增的新闻(增量推送)
# FOCUS_NEW_ONLY 增量推送开关:避免重复推送相同内容,只在有新内容时才发通知
# 优点1.减少重复推送噪音 2.专注最新动态 3.避免通知疲劳
# 适用场景1.高频监控(≤30分钟间隔) 2.实时热点追踪 3.只关心新话题而非持续热度
"MESSAGE_BATCH_SIZE": 4000, # 消息分批大小(字节) "MESSAGE_BATCH_SIZE": 4000, # 消息分批大小(字节)
"BATCH_SEND_INTERVAL": 1, # 批次发送间隔(秒) "BATCH_SEND_INTERVAL": 1, # 批次发送间隔(秒)
# 飞书机器人的 webhook URL # 飞书机器人的 webhook URL
@ -266,6 +271,35 @@ class DataFetcher:
class DataProcessor: class DataProcessor:
"""数据处理器""" """数据处理器"""
@staticmethod
def clean_title(title: str) -> str:
"""清理标题中的特殊字符"""
if not isinstance(title, str):
title = str(title)
# 移除或替换常见的特殊字符
cleaned_title = title.replace("\n", " ").replace( # 换行符替换为空格
"\r", " "
) # 回车符替换为空格
cleaned_title = re.sub(r"\s+", " ", cleaned_title)
cleaned_title = cleaned_title.strip()
return cleaned_title
@staticmethod
def is_first_crawl_today() -> bool:
"""检测是否是当天第一次爬取"""
date_folder = TimeHelper.format_date_folder()
txt_dir = Path("output") / date_folder / "txt"
if not txt_dir.exists():
return True
files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
return len(files) <= 1 # 0个文件或1个文件都算第一次
@staticmethod @staticmethod
def detect_latest_new_titles(id_to_alias: Dict) -> Dict: def detect_latest_new_titles(id_to_alias: Dict) -> Dict:
"""检测当日最新批次的新增标题""" """检测当日最新批次的新增标题"""
@ -365,7 +399,7 @@ class DataProcessor:
if url_part.endswith("]"): if url_part.endswith("]"):
url = url_part[:-1] url = url_part[:-1]
title = title_part.strip() title = DataProcessor.clean_title(title_part.strip())
ranks = [rank] if rank is not None else [1] ranks = [rank] if rank is not None else [1]
titles_by_source[source_name][title] = { titles_by_source[source_name][title] = {
@ -394,6 +428,7 @@ class DataProcessor:
# 按排名排序标题 # 按排名排序标题
sorted_titles = [] sorted_titles = []
for title, info in title_data.items(): for title, info in title_data.items():
cleaned_title = DataProcessor.clean_title(title)
if isinstance(info, dict): if isinstance(info, dict):
ranks = info.get("ranks", []) ranks = info.get("ranks", [])
url = info.get("url", "") url = info.get("url", "")
@ -404,12 +439,12 @@ class DataProcessor:
mobile_url = "" mobile_url = ""
rank = ranks[0] if ranks else 1 rank = ranks[0] if ranks else 1
sorted_titles.append((rank, title, url, mobile_url)) sorted_titles.append((rank, cleaned_title, url, mobile_url))
sorted_titles.sort(key=lambda x: x[0]) sorted_titles.sort(key=lambda x: x[0])
for rank, title, url, mobile_url in sorted_titles: for rank, cleaned_title, url, mobile_url in sorted_titles:
line = f"{rank}. {title}" line = f"{rank}. {cleaned_title}"
if url: if url:
line += f" [URL:{url}]" line += f" [URL:{url}]"
@ -760,11 +795,45 @@ class StatisticsCalculator:
title_info: Optional[Dict] = None, title_info: Optional[Dict] = None,
rank_threshold: int = CONFIG["RANK_THRESHOLD"], rank_threshold: int = CONFIG["RANK_THRESHOLD"],
new_titles: Optional[Dict] = None, new_titles: Optional[Dict] = None,
focus_new_only: bool = False,
) -> Tuple[List[Dict], int]: ) -> Tuple[List[Dict], int]:
"""统计词频,支持必须词、频率词、过滤词,并标记新增标题""" """统计词频,支持必须词、频率词、过滤词,并标记新增标题"""
# 检测是否是当天第一次爬取
is_first_today = DataProcessor.is_first_crawl_today()
# 确定处理的数据源和新增标记逻辑
if focus_new_only:
if is_first_today:
# 新增模式 + 当天第一次:处理所有新闻,都标记为新增
results_to_process = results
all_news_are_new = True
total_input_news = sum(len(titles) for titles in results.values())
print(
f"新增模式:当天第一次爬取,处理 {total_input_news} 条新闻(所有匹配的新闻都视为新增)"
)
else:
# 新增模式 + 当天非第一次:只处理新增的新闻
results_to_process = new_titles if new_titles else {}
all_news_are_new = True # 处理的都是新增新闻
if new_titles:
total_new_count = sum(len(titles) for titles in new_titles.values())
print(
f"新增模式:检测到 {total_new_count} 条新增新闻,开始进行频率词匹配..."
)
else:
print("新增模式:未检测到新增新闻")
else:
# 正常模式:处理所有新闻
results_to_process = results
all_news_are_new = False
total_input_news = sum(len(titles) for titles in results.values())
print(f"正常模式:处理 {total_input_news} 条新闻")
word_stats = {} word_stats = {}
total_titles = 0 total_titles = 0
processed_titles = {} processed_titles = {}
matched_new_count = 0
if title_info is None: if title_info is None:
title_info = {} title_info = {}
@ -775,7 +844,7 @@ class StatisticsCalculator:
group_key = group["group_key"] group_key = group["group_key"]
word_stats[group_key] = {"count": 0, "titles": {}} word_stats[group_key] = {"count": 0, "titles": {}}
for source_id, titles_data in results.items(): for source_id, titles_data in results_to_process.items():
total_titles += len(titles_data) total_titles += len(titles_data)
if source_id not in processed_titles: if source_id not in processed_titles:
@ -786,11 +855,17 @@ class StatisticsCalculator:
continue continue
# 使用统一的匹配逻辑 # 使用统一的匹配逻辑
if not StatisticsCalculator._matches_word_groups( matches_frequency_words = StatisticsCalculator._matches_word_groups(
title, word_groups, filter_words title, word_groups, filter_words
): )
if not matches_frequency_words:
continue continue
# 如果是新增模式,统计匹配的新增新闻数量
if focus_new_only and all_news_are_new:
matched_new_count += 1
source_ranks = title_data.get("ranks", []) source_ranks = title_data.get("ranks", [])
source_url = title_data.get("url", "") source_url = title_data.get("url", "")
source_mobile_url = title_data.get("mobileUrl", "") source_mobile_url = title_data.get("mobileUrl", "")
@ -853,9 +928,13 @@ class StatisticsCalculator:
source_alias = id_to_alias.get(source_id, source_id) source_alias = id_to_alias.get(source_id, source_id)
# 修复is_new判断逻辑添加容错处理 # 判断是否为新增
is_new = False is_new = False
if new_titles and source_id in new_titles: if all_news_are_new:
# 新增模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增
is_new = True
elif new_titles and source_id in new_titles:
# 正常模式下,检查是否在新增列表中
new_titles_for_source = new_titles[source_id] new_titles_for_source = new_titles[source_id]
if title in new_titles_for_source: if title in new_titles_for_source:
is_new = True is_new = True
@ -888,6 +967,21 @@ class StatisticsCalculator:
processed_titles[source_id][title] = True processed_titles[source_id][title] = True
break break
if focus_new_only and not is_first_today:
if new_titles:
total_new_count = sum(len(titles) for titles in new_titles.values())
print(
f"新增模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条匹配频率词"
)
if matched_new_count == 0:
print("新增模式:没有新增新闻匹配频率词,将不会发送通知")
else:
print("新增模式:未检测到新增新闻")
elif focus_new_only and is_first_today:
print(
f"新增模式:当天第一次爬取,{matched_new_count} 条新闻匹配频率词并将推送"
)
stats = [] stats = []
for group_key, data in word_stats.items(): for group_key, data in word_stats.items():
all_titles = [] all_titles = []
@ -1041,17 +1135,54 @@ class ReportGenerator:
failed_ids: Optional[List] = None, failed_ids: Optional[List] = None,
new_titles: Optional[Dict] = None, new_titles: Optional[Dict] = None,
id_to_alias: Optional[Dict] = None, id_to_alias: Optional[Dict] = None,
hide_new_section: bool = False,
) -> Dict: ) -> Dict:
"""准备报告数据""" """准备报告数据"""
filtered_new_titles = {} processed_new_titles = []
if new_titles and id_to_alias:
word_groups, filter_words = DataProcessor.load_frequency_words() # 只有在非隐藏模式下才处理新增新闻部分
for source_id, titles_data in new_titles.items(): if not hide_new_section:
filtered_titles = ReportGenerator._apply_frequency_filter( filtered_new_titles = {}
titles_data, word_groups, filter_words if new_titles and id_to_alias:
) word_groups, filter_words = DataProcessor.load_frequency_words()
if filtered_titles: for source_id, titles_data in new_titles.items():
filtered_new_titles[source_id] = filtered_titles filtered_titles = ReportGenerator._apply_frequency_filter(
titles_data, word_groups, filter_words
)
if filtered_titles:
filtered_new_titles[source_id] = filtered_titles
if filtered_new_titles and id_to_alias:
for source_id, titles_data in filtered_new_titles.items():
source_alias = id_to_alias.get(source_id, source_id)
source_titles = []
for title, title_data in titles_data.items():
url, mobile_url, ranks = (
ReportGenerator._extract_title_data_fields(title_data)
)
processed_title = {
"title": title,
"source_alias": source_alias,
"time_display": "",
"count": 1,
"ranks": ranks,
"rank_threshold": CONFIG["RANK_THRESHOLD"],
"url": url,
"mobile_url": mobile_url,
"is_new": True,
}
source_titles.append(processed_title)
if source_titles:
processed_new_titles.append(
{
"source_id": source_id,
"source_alias": source_alias,
"titles": source_titles,
}
)
processed_stats = [] processed_stats = []
for stat in stats: for stat in stats:
@ -1082,39 +1213,6 @@ class ReportGenerator:
} }
) )
processed_new_titles = []
if filtered_new_titles and id_to_alias:
for source_id, titles_data in filtered_new_titles.items():
source_alias = id_to_alias.get(source_id, source_id)
source_titles = []
for title, title_data in titles_data.items():
url, mobile_url, ranks = ReportGenerator._extract_title_data_fields(
title_data
)
processed_title = {
"title": title,
"source_alias": source_alias,
"time_display": "",
"count": 1,
"ranks": ranks,
"rank_threshold": CONFIG["RANK_THRESHOLD"],
"url": url,
"mobile_url": mobile_url,
"is_new": True,
}
source_titles.append(processed_title)
if source_titles:
processed_new_titles.append(
{
"source_id": source_id,
"source_alias": source_alias,
"titles": source_titles,
}
)
return { return {
"stats": processed_stats, "stats": processed_stats,
"new_titles": processed_new_titles, "new_titles": processed_new_titles,
@ -1170,7 +1268,9 @@ class ReportGenerator:
) )
link_url = title_data["mobile_url"] or title_data["url"] link_url = title_data["mobile_url"] or title_data["url"]
escaped_title = ReportGenerator._html_escape(title_data["title"])
cleaned_title = DataProcessor.clean_title(title_data["title"])
escaped_title = ReportGenerator._html_escape(cleaned_title)
escaped_source_alias = ReportGenerator._html_escape(title_data["source_alias"]) escaped_source_alias = ReportGenerator._html_escape(title_data["source_alias"])
if link_url: if link_url:
@ -1357,10 +1457,13 @@ class ReportGenerator:
) )
link_url = title_data["mobile_url"] or title_data["url"] link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = DataProcessor.clean_title(title_data["title"])
if link_url: if link_url:
formatted_title = f"[{title_data['title']}]({link_url})" formatted_title = f"[{cleaned_title}]({link_url})"
else: else:
formatted_title = title_data["title"] formatted_title = cleaned_title
title_prefix = "🆕 " if title_data["is_new"] else "" title_prefix = "🆕 " if title_data["is_new"] else ""
@ -1386,10 +1489,13 @@ class ReportGenerator:
) )
link_url = title_data["mobile_url"] or title_data["url"] link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = DataProcessor.clean_title(title_data["title"])
if link_url: if link_url:
formatted_title = f"[{title_data['title']}]({link_url})" formatted_title = f"[{cleaned_title}]({link_url})"
else: else:
formatted_title = title_data["title"] formatted_title = cleaned_title
title_prefix = "🆕 " if title_data["is_new"] else "" title_prefix = "🆕 " if title_data["is_new"] else ""
@ -1415,10 +1521,13 @@ class ReportGenerator:
) )
link_url = title_data["mobile_url"] or title_data["url"] link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = DataProcessor.clean_title(title_data["title"])
if link_url: if link_url:
formatted_title = f"[{title_data['title']}]({link_url})" formatted_title = f"[{cleaned_title}]({link_url})"
else: else:
formatted_title = title_data["title"] formatted_title = cleaned_title
title_prefix = "🆕 " if title_data["is_new"] else "" title_prefix = "🆕 " if title_data["is_new"] else ""
@ -1444,10 +1553,13 @@ class ReportGenerator:
) )
link_url = title_data["mobile_url"] or title_data["url"] link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = DataProcessor.clean_title(title_data["title"])
if link_url: if link_url:
formatted_title = f'<a href="{link_url}">{ReportGenerator._html_escape(title_data["title"])}</a>' formatted_title = f'<a href="{link_url}">{ReportGenerator._html_escape(cleaned_title)}</a>'
else: else:
formatted_title = title_data["title"] formatted_title = cleaned_title
title_prefix = "🆕 " if title_data["is_new"] else "" title_prefix = "🆕 " if title_data["is_new"] else ""
@ -1988,13 +2100,14 @@ class ReportGenerator:
id_to_alias: Optional[Dict] = None, id_to_alias: Optional[Dict] = None,
update_info: Optional[Dict] = None, update_info: Optional[Dict] = None,
proxy_url: Optional[str] = None, proxy_url: Optional[str] = None,
hide_new_section: bool = False,
) -> Dict[str, bool]: ) -> Dict[str, bool]:
"""发送数据到多个webhook平台""" """发送数据到多个webhook平台"""
results = {} results = {}
# 数据处理层 # 数据处理层
report_data = ReportGenerator._prepare_report_data( report_data = ReportGenerator._prepare_report_data(
stats, failed_ids, new_titles, id_to_alias stats, failed_ids, new_titles, id_to_alias, hide_new_section
) )
# 获取环境变量中的webhook配置 # 获取环境变量中的webhook配置
@ -2336,6 +2449,21 @@ class NewsAnalyzer:
except Exception as e: except Exception as e:
print(f"版本检查出错: {e}") print(f"版本检查出错: {e}")
def _has_valid_content(
self, stats: List[Dict], new_titles: Optional[Dict] = None
) -> bool:
"""检查是否有有效的新闻内容"""
if CONFIG["FOCUS_NEW_ONLY"]:
# 新增模式下只要stats有内容就说明有新增的匹配新闻
return any(stat["count"] > 0 for stat in stats)
else:
# 正常模式下,检查是否有匹配的频率词新闻或新增新闻
has_matched_news = any(stat["count"] > 0 for stat in stats)
has_new_news = bool(
new_titles and any(len(titles) > 0 for titles in new_titles.values())
)
return has_matched_news or has_new_news
def generate_daily_summary(self) -> Optional[str]: def generate_daily_summary(self) -> Optional[str]:
"""生成当日统计报告""" """生成当日统计报告"""
print("生成当日统计报告...") print("生成当日统计报告...")
@ -2349,11 +2477,7 @@ class NewsAnalyzer:
total_titles = sum(len(titles) for titles in all_results.values()) total_titles = sum(len(titles) for titles in all_results.values())
print(f"读取到 {total_titles} 个标题") print(f"读取到 {total_titles} 个标题")
latest_new_titles = DataProcessor.detect_latest_new_titles(id_to_alias) new_titles = DataProcessor.detect_latest_new_titles(id_to_alias)
if latest_new_titles:
total_new_count = sum(len(titles) for titles in latest_new_titles.values())
print(f"检测到 {total_new_count} 条最新新增新闻")
word_groups, filter_words = DataProcessor.load_frequency_words() word_groups, filter_words = DataProcessor.load_frequency_words()
stats, total_titles = StatisticsCalculator.count_word_frequency( stats, total_titles = StatisticsCalculator.count_word_frequency(
@ -2363,14 +2487,15 @@ class NewsAnalyzer:
id_to_alias, id_to_alias,
title_info, title_info,
self.rank_threshold, self.rank_threshold,
latest_new_titles, new_titles,
focus_new_only=CONFIG["FOCUS_NEW_ONLY"],
) )
html_file = ReportGenerator.generate_html_report( html_file = ReportGenerator.generate_html_report(
stats, stats,
total_titles, total_titles,
is_daily=True, is_daily=True,
new_titles=latest_new_titles, new_titles=new_titles,
id_to_alias=id_to_alias, id_to_alias=id_to_alias,
) )
print(f"当日HTML统计报告已生成: {html_file}") print(f"当日HTML统计报告已生成: {html_file}")
@ -2392,20 +2517,33 @@ class NewsAnalyzer:
CONFIG["ENABLE_NOTIFICATION"] CONFIG["ENABLE_NOTIFICATION"]
and has_webhook and has_webhook
and self.report_type in ["daily", "both"] and self.report_type in ["daily", "both"]
and self._has_valid_content(stats, new_titles)
): ):
hide_new_section = CONFIG["FOCUS_NEW_ONLY"]
ReportGenerator.send_to_webhooks( ReportGenerator.send_to_webhooks(
stats, stats,
[], [],
"当日汇总", "当日汇总",
latest_new_titles, new_titles,
id_to_alias, id_to_alias,
self.update_info, self.update_info,
self.proxy_url, self.proxy_url,
hide_new_section=hide_new_section,
) )
elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook: elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook:
print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送") print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送")
elif not CONFIG["ENABLE_NOTIFICATION"]: elif not CONFIG["ENABLE_NOTIFICATION"]:
print("跳过当日汇总通知:通知功能已禁用") print("跳过当日汇总通知:通知功能已禁用")
elif (
CONFIG["ENABLE_NOTIFICATION"]
and has_webhook
and not self._has_valid_content(stats, new_titles)
):
if CONFIG["FOCUS_NEW_ONLY"]:
print("跳过当日汇总通知:新增模式下未检测到匹配的新增新闻")
else:
print("跳过当日汇总通知:未匹配到有效的新闻内容")
return html_file return html_file
@ -2441,6 +2579,11 @@ class NewsAnalyzer:
print(f"报告类型: {self.report_type}") print(f"报告类型: {self.report_type}")
if CONFIG["FOCUS_NEW_ONLY"]:
print("运行模式: 新增检测模式(只关注新增新闻)")
else:
print("运行模式: 正常模式(频率词统计 + 新增检测)")
ids = [ ids = [
("toutiao", "今日头条"), ("toutiao", "今日头条"),
("baidu", "百度热搜"), ("baidu", "百度热搜"),
@ -2496,14 +2639,18 @@ class NewsAnalyzer:
title_info, title_info,
self.rank_threshold, self.rank_threshold,
new_titles, new_titles,
focus_new_only=CONFIG["FOCUS_NEW_ONLY"],
) )
# 只有启用通知且配置了webhook时才发送通知 # 只有启用通知且配置了webhook且有有效内容时才发送通知
if ( if (
CONFIG["ENABLE_NOTIFICATION"] CONFIG["ENABLE_NOTIFICATION"]
and has_webhook and has_webhook
and self.report_type in ["current", "both"] and self.report_type in ["current", "both"]
and self._has_valid_content(stats, new_titles)
): ):
hide_new_section = CONFIG["FOCUS_NEW_ONLY"]
ReportGenerator.send_to_webhooks( ReportGenerator.send_to_webhooks(
stats, stats,
failed_ids, failed_ids,
@ -2512,11 +2659,21 @@ class NewsAnalyzer:
id_to_alias, id_to_alias,
self.update_info, self.update_info,
self.proxy_url, self.proxy_url,
hide_new_section=hide_new_section,
) )
elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook: elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook:
print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送") print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送")
elif not CONFIG["ENABLE_NOTIFICATION"]: elif not CONFIG["ENABLE_NOTIFICATION"]:
print("跳过单次爬取通知:通知功能已禁用") print("跳过单次爬取通知:通知功能已禁用")
elif (
CONFIG["ENABLE_NOTIFICATION"]
and has_webhook
and not self._has_valid_content(stats, new_titles)
):
if CONFIG["FOCUS_NEW_ONLY"]:
print("跳过单次爬取通知:新增模式下未检测到匹配的新增新闻")
else:
print("跳过单次爬取通知:未匹配到有效的新闻内容")
html_file = ReportGenerator.generate_html_report( html_file = ReportGenerator.generate_html_report(
stats, total_titles, failed_ids, False, new_titles, id_to_alias stats, total_titles, failed_ids, False, new_titles, id_to_alias