Update main.py

This commit is contained in:
sansan 2025-06-23 19:50:41 +08:00 committed by GitHub
parent 47bf85a20f
commit 482cb079a2

666
main.py
View File

@ -13,7 +13,7 @@ import requests
import pytz
CONFIG = {
"VERSION": "1.2.1",
"VERSION": "1.3.0",
"VERSION_CHECK_URL": "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version",
"SHOW_VERSION_UPDATE": True, # 控制显示版本更新提示,改成 False 将不接受新版本提示
"FEISHU_MESSAGE_SEPARATOR": "━━━━━━━━━━━━━━━━━━━", # feishu消息分割线
@ -22,7 +22,10 @@ CONFIG = {
"RANK_THRESHOLD": 5, # 排名高亮阈值
"USE_PROXY": True, # 是否启用代理
"DEFAULT_PROXY": "http://127.0.0.1:10086",
"CONTINUE_WITHOUT_WEBHOOK": True, # 控制在没有webhook URL时是否继续执行爬虫
"ENABLE_CRAWLER": True, # 是否启用爬取新闻功能False时直接停止程序
"ENABLE_NOTIFICATION": True, # 是否启用通知功能False时不发送手机通知
"MESSAGE_BATCH_SIZE": 4000, # 消息分批大小(字节)
"BATCH_SEND_INTERVAL": 1, # 批次发送间隔(秒)
# 飞书机器人的 webhook URL
"FEISHU_WEBHOOK_URL": "",
# 钉钉机器人的 webhook URL
@ -274,8 +277,7 @@ class DataProcessor:
files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
if len(files) < 2:
if len(files) == 1:
return DataProcessor._parse_file_titles(files[0])
# 如果只有一个文件(第一次爬取),没有"新增"的概念,返回空字典
return {}
latest_file = files[-1]
@ -307,6 +309,7 @@ class DataProcessor:
if alias == source_name:
source_id = id_val
break
if source_id:
new_titles[source_id] = source_new_titles
@ -849,7 +852,20 @@ class StatisticsCalculator:
)
source_alias = id_to_alias.get(source_id, source_id)
is_new = source_id in new_titles and title in new_titles[source_id]
# 修复is_new判断逻辑添加容错处理
is_new = False
if new_titles and source_id in new_titles:
new_titles_for_source = new_titles[source_id]
if title in new_titles_for_source:
is_new = True
else:
# 如果直接匹配失败,尝试去除首尾空格后匹配
title_stripped = title.strip()
for new_title in new_titles_for_source.keys():
if title_stripped == new_title.strip():
is_new = True
break
word_stats[group_key]["titles"][source_id].append(
{
@ -1630,191 +1646,338 @@ class ReportGenerator:
return text_content
@staticmethod
def _render_wework_content(
report_data: Dict, update_info: Optional[Dict] = None
) -> str:
"""渲染企业微信内容"""
text_content = ""
def _split_content_into_batches(
report_data: Dict,
format_type: str,
update_info: Optional[Dict] = None,
max_bytes: int = CONFIG["MESSAGE_BATCH_SIZE"],
) -> List[str]:
"""分批处理消息内容,确保词组标题+至少第一条新闻的完整性"""
batches = []
# 计算总标题数
# 基础信息构建
total_titles = sum(
len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
)
now = TimeHelper.get_beijing_time()
# 顶部统计信息
text_content += f"**总新闻数:** {total_titles}\n\n"
text_content += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
text_content += f"**类型:** 热点分析报告\n\n\n\n"
base_header = ""
if format_type == "wework":
base_header = f"**总新闻数:** {total_titles}\n\n\n\n"
elif format_type == "telegram":
base_header = f"总新闻数: {total_titles}\n\n"
# 渲染热点词汇统计
base_footer = ""
if format_type == "wework":
base_footer = f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
if update_info:
base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
elif format_type == "telegram":
base_footer = f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
if update_info:
base_footer += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}"
stats_header = ""
if report_data["stats"]:
text_content += "📊 **热点词汇统计**\n\n"
if format_type == "wework":
stats_header = "📊 **热点词汇统计**\n\n"
elif format_type == "telegram":
stats_header = "📊 热点词汇统计\n\n"
current_batch = base_header
current_batch_has_content = False
# 空内容处理
if (
not report_data["stats"]
and not report_data["new_titles"]
and not report_data["failed_ids"]
):
simple_content = "📭 暂无匹配的热点词汇\n\n"
final_content = base_header + simple_content + base_footer
batches.append(final_content)
return batches
# 处理热点词汇统计
if report_data["stats"]:
total_count = len(report_data["stats"])
# 添加统计标题
test_content = current_batch + stats_header
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
< max_bytes
):
current_batch = test_content
current_batch_has_content = True
else:
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + stats_header
current_batch_has_content = True
# 逐个处理词组(确保词组标题+第一条新闻的原子性)
for i, stat in enumerate(report_data["stats"]):
word = stat["word"]
count = stat["count"]
sequence_display = f"[{i + 1}/{total_count}]"
if count >= 10:
text_content += (
f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
)
elif count >= 5:
text_content += (
f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
)
# 构建词组标题
word_header = ""
if format_type == "wework":
if count >= 10:
word_header = (
f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
)
elif count >= 5:
word_header = (
f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
)
else:
word_header = (
f"📌 {sequence_display} **{word}** : {count}\n\n"
)
elif format_type == "telegram":
if count >= 10:
word_header = f"🔥 {sequence_display} {word} : {count}\n\n"
elif count >= 5:
word_header = f"📈 {sequence_display} {word} : {count}\n\n"
else:
word_header = f"📌 {sequence_display} {word} : {count}\n\n"
# 构建第一条新闻
first_news_line = ""
if stat["titles"]:
first_title_data = stat["titles"][0]
if format_type == "wework":
formatted_title = ReportGenerator._format_title_wework(
first_title_data, show_source=True
)
elif format_type == "telegram":
formatted_title = ReportGenerator._format_title_telegram(
first_title_data, show_source=True
)
else:
formatted_title = f"{first_title_data['title']}"
first_news_line = f" 1. {formatted_title}\n"
if len(stat["titles"]) > 1:
first_news_line += "\n"
# 原子性检查:词组标题+第一条新闻必须一起处理
word_with_first_news = word_header + first_news_line
test_content = current_batch + word_with_first_news
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
>= max_bytes
):
# 当前批次容纳不下,开启新批次
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + stats_header + word_with_first_news
current_batch_has_content = True
start_index = 1
else:
text_content += f"📌 {sequence_display} **{word}** : {count}\n\n"
current_batch = test_content
current_batch_has_content = True
start_index = 1
for j, title_data in enumerate(stat["titles"], 1):
formatted_title = ReportGenerator._format_title_wework(
title_data, show_source=True
)
text_content += f" {j}. {formatted_title}\n"
# 处理剩余新闻条目
for j in range(start_index, len(stat["titles"])):
title_data = stat["titles"][j]
if format_type == "wework":
formatted_title = ReportGenerator._format_title_wework(
title_data, show_source=True
)
elif format_type == "telegram":
formatted_title = ReportGenerator._format_title_telegram(
title_data, show_source=True
)
else:
formatted_title = f"{title_data['title']}"
if j < len(stat["titles"]):
text_content += "\n"
news_line = f" {j + 1}. {formatted_title}\n"
if j < len(stat["titles"]) - 1:
news_line += "\n"
test_content = current_batch + news_line
if (
len(test_content.encode("utf-8"))
+ len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = (
base_header + stats_header + word_header + news_line
)
current_batch_has_content = True
else:
current_batch = test_content
current_batch_has_content = True
# 词组间分隔符
if i < len(report_data["stats"]) - 1:
text_content += f"\n\n\n\n"
separator = ""
if format_type == "wework":
separator = f"\n\n\n\n"
elif format_type == "telegram":
separator = f"\n\n"
if not report_data["stats"]:
text_content += "📭 暂无匹配的热点词汇\n\n"
test_content = current_batch + separator
if (
len(test_content.encode("utf-8"))
+ len(base_footer.encode("utf-8"))
< max_bytes
):
current_batch = test_content
# 渲染新增新闻部分
# 处理新增新闻(同样确保来源标题+第一条新闻的原子性)
if report_data["new_titles"]:
if text_content and "暂无匹配" not in text_content:
text_content += f"\n\n\n\n"
new_header = ""
if format_type == "wework":
new_header = f"\n\n\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
elif format_type == "telegram":
new_header = f"\n\n🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n"
text_content += (
f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
)
test_content = current_batch + new_header
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + new_header
current_batch_has_content = True
else:
current_batch = test_content
current_batch_has_content = True
# 逐个处理新增新闻来源
for source_data in report_data["new_titles"]:
text_content += f"**{source_data['source_alias']}** ({len(source_data['titles'])} 条):\n\n"
source_header = ""
if format_type == "wework":
source_header = f"**{source_data['source_alias']}** ({len(source_data['titles'])} 条):\n\n"
elif format_type == "telegram":
source_header = f"{source_data['source_alias']} ({len(source_data['titles'])} 条):\n\n"
for j, title_data in enumerate(source_data["titles"], 1):
# 构建第一条新增新闻
first_news_line = ""
if source_data["titles"]:
first_title_data = source_data["titles"][0]
title_data_copy = first_title_data.copy()
title_data_copy["is_new"] = False
if format_type == "wework":
formatted_title = ReportGenerator._format_title_wework(
title_data_copy, show_source=False
)
elif format_type == "telegram":
formatted_title = ReportGenerator._format_title_telegram(
title_data_copy, show_source=False
)
else:
formatted_title = f"{title_data_copy['title']}"
first_news_line = f" 1. {formatted_title}\n"
# 原子性检查:来源标题+第一条新闻
source_with_first_news = source_header + first_news_line
test_content = current_batch + source_with_first_news
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + new_header + source_with_first_news
current_batch_has_content = True
start_index = 1
else:
current_batch = test_content
current_batch_has_content = True
start_index = 1
# 处理剩余新增新闻
for j in range(start_index, len(source_data["titles"])):
title_data = source_data["titles"][j]
title_data_copy = title_data.copy()
title_data_copy["is_new"] = False
formatted_title = ReportGenerator._format_title_wework(
title_data_copy, show_source=False
)
text_content += f" {j}. {formatted_title}\n"
text_content += "\n"
if format_type == "wework":
formatted_title = ReportGenerator._format_title_wework(
title_data_copy, show_source=False
)
elif format_type == "telegram":
formatted_title = ReportGenerator._format_title_telegram(
title_data_copy, show_source=False
)
else:
formatted_title = f"{title_data_copy['title']}"
# 渲染失败平台
news_line = f" {j + 1}. {formatted_title}\n"
test_content = current_batch + news_line
if (
len(test_content.encode("utf-8"))
+ len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = (
base_header + new_header + source_header + news_line
)
current_batch_has_content = True
else:
current_batch = test_content
current_batch_has_content = True
current_batch += "\n"
# 处理失败平台
if report_data["failed_ids"]:
if text_content and "暂无匹配" not in text_content:
text_content += f"\n\n\n\n"
failed_header = ""
if format_type == "wework":
failed_header = f"\n\n\n\n⚠️ **数据获取失败的平台:**\n\n"
elif format_type == "telegram":
failed_header = f"\n\n⚠️ 数据获取失败的平台:\n\n"
test_content = current_batch + failed_header
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + failed_header
current_batch_has_content = True
else:
current_batch = test_content
current_batch_has_content = True
text_content += "⚠️ **数据获取失败的平台:**\n\n"
for i, id_value in enumerate(report_data["failed_ids"], 1):
text_content += f"{id_value}\n"
# 添加时间戳
text_content += f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
# 版本更新提示
if update_info:
text_content += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
return text_content
@staticmethod
def _render_telegram_content(
report_data: Dict, update_info: Optional[Dict] = None
) -> str:
"""渲染Telegram内容"""
text_content = ""
# 计算总标题数
total_titles = sum(
len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
)
now = TimeHelper.get_beijing_time()
# 顶部统计信息
text_content += f"总新闻数: {total_titles}\n"
text_content += f"时间: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
text_content += f"类型: 热点分析报告\n\n"
# 渲染热点词汇统计
if report_data["stats"]:
text_content += "📊 热点词汇统计\n\n"
total_count = len(report_data["stats"])
for i, stat in enumerate(report_data["stats"]):
word = stat["word"]
count = stat["count"]
sequence_display = f"[{i + 1}/{total_count}]"
if count >= 10:
text_content += f"🔥 {sequence_display} {word} : {count}\n\n"
elif count >= 5:
text_content += f"📈 {sequence_display} {word} : {count}\n\n"
failed_line = f"{id_value}\n"
test_content = current_batch + failed_line
if (
len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
>= max_bytes
):
if current_batch_has_content:
batches.append(current_batch + base_footer)
current_batch = base_header + failed_header + failed_line
current_batch_has_content = True
else:
text_content += f"📌 {sequence_display} {word} : {count}\n\n"
current_batch = test_content
current_batch_has_content = True
for j, title_data in enumerate(stat["titles"], 1):
formatted_title = ReportGenerator._format_title_telegram(
title_data, show_source=True
)
text_content += f" {j}. {formatted_title}\n"
# 完成最后批次
if current_batch_has_content:
batches.append(current_batch + base_footer)
if j < len(stat["titles"]):
text_content += "\n"
if i < len(report_data["stats"]) - 1:
text_content += f"\n\n"
if not report_data["stats"]:
text_content += "📭 暂无匹配的热点词汇\n\n"
# 渲染新增新闻部分
if report_data["new_titles"]:
if text_content and "暂无匹配" not in text_content:
text_content += f"\n\n"
text_content += (
f"🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n"
)
for source_data in report_data["new_titles"]:
text_content += f"{source_data['source_alias']} ({len(source_data['titles'])} 条):\n\n"
for j, title_data in enumerate(source_data["titles"], 1):
title_data_copy = title_data.copy()
title_data_copy["is_new"] = False
formatted_title = ReportGenerator._format_title_telegram(
title_data_copy, show_source=False
)
text_content += f" {j}. {formatted_title}\n"
text_content += "\n"
# 渲染失败平台
if report_data["failed_ids"]:
if text_content and "暂无匹配" not in text_content:
text_content += f"\n\n"
text_content += "⚠️ 数据获取失败的平台:\n\n"
for i, id_value in enumerate(report_data["failed_ids"], 1):
text_content += f"{id_value}\n"
text_content += f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
# 版本更新提示
if update_info:
text_content += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}"
return text_content
return batches
@staticmethod
def send_to_webhooks(
@ -1988,39 +2151,68 @@ class ReportGenerator:
update_info: Optional[Dict] = None,
proxy_url: Optional[str] = None,
) -> bool:
"""发送到企业微信"""
"""发送到企业微信(支持分批发送)"""
headers = {"Content-Type": "application/json"}
text_content = ReportGenerator._render_wework_content(report_data, update_info)
payload = {"msgtype": "markdown", "markdown": {"content": text_content}}
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
try:
response = requests.post(
webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30
# 获取分批内容
batches = ReportGenerator._split_content_into_batches(
report_data, "wework", update_info
)
print(f"企业微信消息分为 {len(batches)} 批次发送 [{report_type}]")
# 逐批发送
for i, batch_content in enumerate(batches, 1):
batch_size = len(batch_content.encode("utf-8"))
print(
f"发送企业微信第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
)
if response.status_code == 200:
result = response.json()
if result.get("errcode") == 0:
print(f"企业微信通知发送成功 [{report_type}]")
return True
# 添加批次标识
if len(batches) > 1:
batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n"
batch_content = batch_header + batch_content
payload = {"msgtype": "markdown", "markdown": {"content": batch_content}}
try:
response = requests.post(
webhook_url,
headers=headers,
json=payload,
proxies=proxies,
timeout=30,
)
if response.status_code == 200:
result = response.json()
if result.get("errcode") == 0:
print(
f"企业微信第 {i}/{len(batches)} 批次发送成功 [{report_type}]"
)
# 批次间间隔
if i < len(batches):
time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
else:
print(
f"企业微信第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('errmsg')}"
)
return False
else:
print(
f"企业微信通知发送失败 [{report_type}],错误:{result.get('errmsg')}"
f"企业微信{i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
)
return False
else:
except Exception as e:
print(
f"企业微信通知发送失败 [{report_type}],状态码:{response.status_code}"
f"企业微信{i}/{len(batches)} 批次发送出错 [{report_type}]{e}"
)
return False
except Exception as e:
print(f"企业微信通知发送出错 [{report_type}]{e}")
return False
print(f"企业微信所有 {len(batches)} 批次发送完成 [{report_type}]")
return True
@staticmethod
def _send_to_telegram(
@ -2031,48 +2223,71 @@ class ReportGenerator:
update_info: Optional[Dict] = None,
proxy_url: Optional[str] = None,
) -> bool:
"""发送到Telegram"""
"""发送到Telegram(支持分批发送)"""
headers = {"Content-Type": "application/json"}
text_content = ReportGenerator._render_telegram_content(
report_data, update_info
)
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": text_content,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
try:
response = requests.post(
url, headers=headers, json=payload, proxies=proxies, timeout=30
# 获取分批内容
batches = ReportGenerator._split_content_into_batches(
report_data, "telegram", update_info
)
print(f"Telegram消息分为 {len(batches)} 批次发送 [{report_type}]")
# 逐批发送
for i, batch_content in enumerate(batches, 1):
batch_size = len(batch_content.encode("utf-8"))
print(
f"发送Telegram第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
)
if response.status_code == 200:
result = response.json()
if result.get("ok"):
print(f"Telegram通知发送成功 [{report_type}]")
return True
# 添加批次标识
if len(batches) > 1:
batch_header = f"<b>[第 {i}/{len(batches)} 批次]</b>\n\n"
batch_content = batch_header + batch_content
payload = {
"chat_id": chat_id,
"text": batch_content,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
try:
response = requests.post(
url, headers=headers, json=payload, proxies=proxies, timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get("ok"):
print(
f"Telegram第 {i}/{len(batches)} 批次发送成功 [{report_type}]"
)
# 批次间间隔
if i < len(batches):
time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
else:
print(
f"Telegram第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('description')}"
)
return False
else:
print(
f"Telegram通知发送失败 [{report_type}],错误:{result.get('description')}"
f"Telegram{i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
)
return False
else:
except Exception as e:
print(
f"Telegram通知发送失败 [{report_type}],状态码:{response.status_code}"
f"Telegram{i}/{len(batches)} 批次发送出错 [{report_type}]{e}"
)
return False
except Exception as e:
print(f"Telegram通知发送出错 [{report_type}]{e}")
return False
print(f"Telegram所有 {len(batches)} 批次发送完成 [{report_type}]")
return True
class NewsAnalyzer:
@ -2160,7 +2375,24 @@ class NewsAnalyzer:
)
print(f"当日HTML统计报告已生成: {html_file}")
if self.report_type in ["daily", "both"]:
# 检查通知配置
has_webhook = any(
[
os.environ.get("FEISHU_WEBHOOK_URL", CONFIG["FEISHU_WEBHOOK_URL"]),
os.environ.get("DINGTALK_WEBHOOK_URL", CONFIG["DINGTALK_WEBHOOK_URL"]),
os.environ.get("WEWORK_WEBHOOK_URL", CONFIG["WEWORK_WEBHOOK_URL"]),
(
os.environ.get("TELEGRAM_BOT_TOKEN", CONFIG["TELEGRAM_BOT_TOKEN"])
and os.environ.get("TELEGRAM_CHAT_ID", CONFIG["TELEGRAM_CHAT_ID"])
),
]
)
if (
CONFIG["ENABLE_NOTIFICATION"]
and has_webhook
and self.report_type in ["daily", "both"]
):
ReportGenerator.send_to_webhooks(
stats,
[],
@ -2170,6 +2402,10 @@ class NewsAnalyzer:
self.update_info,
self.proxy_url,
)
elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook:
print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送")
elif not CONFIG["ENABLE_NOTIFICATION"]:
print("跳过当日汇总通知:通知功能已禁用")
return html_file
@ -2178,6 +2414,10 @@ class NewsAnalyzer:
now = TimeHelper.get_beijing_time()
print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
if not CONFIG["ENABLE_CRAWLER"]:
print("爬虫功能已禁用ENABLE_CRAWLER=False程序退出")
return
# 检查是否配置了任何webhook URL
has_webhook = any(
[
@ -2191,14 +2431,13 @@ class NewsAnalyzer:
]
)
if not has_webhook and not CONFIG["CONTINUE_WITHOUT_WEBHOOK"]:
print(
"错误: 未配置任何webhook URL且CONTINUE_WITHOUT_WEBHOOK为False程序退出"
)
return
if not has_webhook:
print("未配置任何webhook URL将继续执行爬虫但不发送通知")
# 通知功能状态检查和提示
if not CONFIG["ENABLE_NOTIFICATION"]:
print("通知功能已禁用ENABLE_NOTIFICATION=False将只进行数据抓取")
elif not has_webhook:
print("未配置任何webhook URL将只进行数据抓取不发送通知")
else:
print("通知功能已启用将发送webhook通知")
print(f"报告类型: {self.report_type}")
@ -2259,7 +2498,12 @@ class NewsAnalyzer:
new_titles,
)
if self.report_type in ["current", "both"]:
# 只有启用通知且配置了webhook时才发送通知
if (
CONFIG["ENABLE_NOTIFICATION"]
and has_webhook
and self.report_type in ["current", "both"]
):
ReportGenerator.send_to_webhooks(
stats,
failed_ids,
@ -2269,6 +2513,10 @@ class NewsAnalyzer:
self.update_info,
self.proxy_url,
)
elif CONFIG["ENABLE_NOTIFICATION"] and not has_webhook:
print("⚠️ 警告通知功能已启用但未配置webhook URL将跳过通知发送")
elif not CONFIG["ENABLE_NOTIFICATION"]:
print("跳过单次爬取通知:通知功能已禁用")
html_file = ReportGenerator.generate_html_report(
stats, total_titles, failed_ids, False, new_titles, id_to_alias