- """
for id_value in report_data["failed_ids"]:
html += f'
- {html_escape(id_value)} ' html += """
# coding=utf-8
import json
import os
import random
import re
import time
import webbrowser
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr, formatdate, make_msgid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Union
import pytz
import requests
import yaml
VERSION = "2.4.0"
# === SMTP邮件配置 ===
SMTP_CONFIGS = {
# Gmail
"gmail.com": {"server": "smtp.gmail.com", "port": 587, "encryption": "TLS"},
# QQ邮箱
"qq.com": {"server": "smtp.qq.com", "port": 587, "encryption": "TLS"},
# Outlook
"outlook.com": {
"server": "smtp-mail.outlook.com",
"port": 587,
"encryption": "TLS",
},
"hotmail.com": {
"server": "smtp-mail.outlook.com",
"port": 587,
"encryption": "TLS",
},
"live.com": {"server": "smtp-mail.outlook.com", "port": 587, "encryption": "TLS"},
# 网易邮箱
"163.com": {"server": "smtp.163.com", "port": 587, "encryption": "TLS"},
"126.com": {"server": "smtp.126.com", "port": 587, "encryption": "TLS"},
# 新浪邮箱
"sina.com": {"server": "smtp.sina.com", "port": 587, "encryption": "TLS"},
# 搜狐邮箱
"sohu.com": {"server": "smtp.sohu.com", "port": 587, "encryption": "TLS"},
}
# === 配置管理 ===
def load_config():
"""加载配置文件"""
config_path = os.environ.get("CONFIG_PATH", "config/config.yaml")
if not Path(config_path).exists():
raise FileNotFoundError(f"配置文件 {config_path} 不存在")
with open(config_path, "r", encoding="utf-8") as f:
config_data = yaml.safe_load(f)
print(f"配置文件加载成功: {config_path}")
# 构建配置
config = {
"VERSION_CHECK_URL": config_data["app"]["version_check_url"],
"SHOW_VERSION_UPDATE": config_data["app"]["show_version_update"],
"REQUEST_INTERVAL": config_data["crawler"]["request_interval"],
"REPORT_MODE": config_data["report"]["mode"],
"RANK_THRESHOLD": config_data["report"]["rank_threshold"],
"USE_PROXY": config_data["crawler"]["use_proxy"],
"DEFAULT_PROXY": config_data["crawler"]["default_proxy"],
"ENABLE_CRAWLER": config_data["crawler"]["enable_crawler"],
"ENABLE_NOTIFICATION": config_data["notification"]["enable_notification"],
"MESSAGE_BATCH_SIZE": config_data["notification"]["message_batch_size"],
"DINGTALK_BATCH_SIZE": config_data["notification"].get(
"dingtalk_batch_size", 20000
),
"BATCH_SEND_INTERVAL": config_data["notification"]["batch_send_interval"],
"FEISHU_MESSAGE_SEPARATOR": config_data["notification"][
"feishu_message_separator"
],
"SILENT_PUSH": {
"ENABLED": config_data["notification"]
.get("silent_push", {})
.get("enabled", False),
"TIME_RANGE": {
"START": config_data["notification"]
.get("silent_push", {})
.get("time_range", {})
.get("start", "08:00"),
"END": config_data["notification"]
.get("silent_push", {})
.get("time_range", {})
.get("end", "22:00"),
},
"ONCE_PER_DAY": config_data["notification"]
.get("silent_push", {})
.get("once_per_day", True),
"RECORD_RETENTION_DAYS": config_data["notification"]
.get("silent_push", {})
.get("push_record_retention_days", 7),
},
"WEIGHT_CONFIG": {
"RANK_WEIGHT": config_data["weight"]["rank_weight"],
"FREQUENCY_WEIGHT": config_data["weight"]["frequency_weight"],
"HOTNESS_WEIGHT": config_data["weight"]["hotness_weight"],
},
"PLATFORMS": config_data["platforms"],
}
# 通知渠道配置(环境变量优先)
notification = config_data.get("notification", {})
webhooks = notification.get("webhooks", {})
config["FEISHU_WEBHOOK_URL"] = os.environ.get(
"FEISHU_WEBHOOK_URL", ""
).strip() or webhooks.get("feishu_url", "")
config["DINGTALK_WEBHOOK_URL"] = os.environ.get(
"DINGTALK_WEBHOOK_URL", ""
).strip() or webhooks.get("dingtalk_url", "")
config["WEWORK_WEBHOOK_URL"] = os.environ.get(
"WEWORK_WEBHOOK_URL", ""
).strip() or webhooks.get("wework_url", "")
config["TELEGRAM_BOT_TOKEN"] = os.environ.get(
"TELEGRAM_BOT_TOKEN", ""
).strip() or webhooks.get("telegram_bot_token", "")
config["TELEGRAM_CHAT_ID"] = os.environ.get(
"TELEGRAM_CHAT_ID", ""
).strip() or webhooks.get("telegram_chat_id", "")
# 邮件配置
config["EMAIL_FROM"] = os.environ.get("EMAIL_FROM", "").strip() or webhooks.get(
"email_from", ""
)
config["EMAIL_PASSWORD"] = os.environ.get(
"EMAIL_PASSWORD", ""
).strip() or webhooks.get("email_password", "")
config["EMAIL_TO"] = os.environ.get("EMAIL_TO", "").strip() or webhooks.get(
"email_to", ""
)
config["EMAIL_SMTP_SERVER"] = os.environ.get(
"EMAIL_SMTP_SERVER", ""
).strip() or webhooks.get("email_smtp_server", "")
config["EMAIL_SMTP_PORT"] = os.environ.get(
"EMAIL_SMTP_PORT", ""
).strip() or webhooks.get("email_smtp_port", "")
# ntfy配置
config["NTFY_SERVER_URL"] = os.environ.get(
"NTFY_SERVER_URL", "https://ntfy.sh"
).strip() or webhooks.get("ntfy_server_url", "https://ntfy.sh")
config["NTFY_TOPIC"] = os.environ.get("NTFY_TOPIC", "").strip() or webhooks.get(
"ntfy_topic", ""
)
config["NTFY_TOKEN"] = os.environ.get("NTFY_TOKEN", "").strip() or webhooks.get(
"ntfy_token", ""
)
# 输出配置来源信息
notification_sources = []
if config["FEISHU_WEBHOOK_URL"]:
source = "环境变量" if os.environ.get("FEISHU_WEBHOOK_URL") else "配置文件"
notification_sources.append(f"飞书({source})")
if config["DINGTALK_WEBHOOK_URL"]:
source = "环境变量" if os.environ.get("DINGTALK_WEBHOOK_URL") else "配置文件"
notification_sources.append(f"钉钉({source})")
if config["WEWORK_WEBHOOK_URL"]:
source = "环境变量" if os.environ.get("WEWORK_WEBHOOK_URL") else "配置文件"
notification_sources.append(f"企业微信({source})")
if config["TELEGRAM_BOT_TOKEN"] and config["TELEGRAM_CHAT_ID"]:
token_source = (
"环境变量" if os.environ.get("TELEGRAM_BOT_TOKEN") else "配置文件"
)
chat_source = "环境变量" if os.environ.get("TELEGRAM_CHAT_ID") else "配置文件"
notification_sources.append(f"Telegram({token_source}/{chat_source})")
if config["EMAIL_FROM"] and config["EMAIL_PASSWORD"] and config["EMAIL_TO"]:
from_source = "环境变量" if os.environ.get("EMAIL_FROM") else "配置文件"
notification_sources.append(f"邮件({from_source})")
if config["NTFY_SERVER_URL"] and config["NTFY_TOPIC"]:
server_source = "环境变量" if os.environ.get("NTFY_SERVER_URL") else "配置文件"
notification_sources.append(f"ntfy({server_source})")
if notification_sources:
print(f"通知渠道配置来源: {', '.join(notification_sources)}")
else:
print("未配置任何通知渠道")
return config
print("正在加载配置...")
CONFIG = load_config()
print(f"TrendRadar v{VERSION} 配置加载完成")
print(f"监控平台数量: {len(CONFIG['PLATFORMS'])}")
# === 工具函数 ===
def get_beijing_time():
"""获取北京时间"""
return datetime.now(pytz.timezone("Asia/Shanghai"))
def format_date_folder():
"""格式化日期文件夹"""
return get_beijing_time().strftime("%Y年%m月%d日")
def format_time_filename():
"""格式化时间文件名"""
return get_beijing_time().strftime("%H时%M分")
def clean_title(title: str) -> str:
"""清理标题中的特殊字符"""
if not isinstance(title, str):
title = str(title)
cleaned_title = title.replace("\n", " ").replace("\r", " ")
cleaned_title = re.sub(r"\s+", " ", cleaned_title)
cleaned_title = cleaned_title.strip()
return cleaned_title
def ensure_directory_exists(directory: str):
"""确保目录存在"""
Path(directory).mkdir(parents=True, exist_ok=True)
def get_output_path(subfolder: str, filename: str) -> str:
"""获取输出路径"""
date_folder = format_date_folder()
output_dir = Path("output") / date_folder / subfolder
ensure_directory_exists(str(output_dir))
return str(output_dir / filename)
def check_version_update(
current_version: str, version_url: str, proxy_url: Optional[str] = None
) -> Tuple[bool, Optional[str]]:
"""检查版本更新"""
try:
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/plain, */*",
"Cache-Control": "no-cache",
}
response = requests.get(
version_url, proxies=proxies, headers=headers, timeout=10
)
response.raise_for_status()
remote_version = response.text.strip()
print(f"当前版本: {current_version}, 远程版本: {remote_version}")
# 比较版本
def parse_version(version_str):
try:
parts = version_str.strip().split(".")
if len(parts) != 3:
raise ValueError("版本号格式不正确")
return int(parts[0]), int(parts[1]), int(parts[2])
except:
return 0, 0, 0
current_tuple = parse_version(current_version)
remote_tuple = parse_version(remote_version)
need_update = current_tuple < remote_tuple
return need_update, remote_version if need_update else None
except Exception as e:
print(f"版本检查失败: {e}")
return False, None
def is_first_crawl_today() -> bool:
"""检测是否是当天第一次爬取"""
date_folder = format_date_folder()
txt_dir = Path("output") / date_folder / "txt"
if not txt_dir.exists():
return True
files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
return len(files) <= 1
def html_escape(text: str) -> str:
"""HTML转义"""
if not isinstance(text, str):
text = str(text)
return (
text.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("'", "'")
)
# === 推送记录管理 ===
class PushRecordManager:
"""推送记录管理器"""
def __init__(self):
self.record_dir = Path("output") / ".push_records"
self.ensure_record_dir()
self.cleanup_old_records()
def ensure_record_dir(self):
"""确保记录目录存在"""
self.record_dir.mkdir(parents=True, exist_ok=True)
def get_today_record_file(self) -> Path:
"""获取今天的记录文件路径"""
today = get_beijing_time().strftime("%Y%m%d")
return self.record_dir / f"push_record_{today}.json"
def cleanup_old_records(self):
"""清理过期的推送记录"""
retention_days = CONFIG["SILENT_PUSH"]["RECORD_RETENTION_DAYS"]
current_time = get_beijing_time()
for record_file in self.record_dir.glob("push_record_*.json"):
try:
date_str = record_file.stem.replace("push_record_", "")
file_date = datetime.strptime(date_str, "%Y%m%d")
file_date = pytz.timezone("Asia/Shanghai").localize(file_date)
if (current_time - file_date).days > retention_days:
record_file.unlink()
print(f"清理过期推送记录: {record_file.name}")
except Exception as e:
print(f"清理记录文件失败 {record_file}: {e}")
def has_pushed_today(self) -> bool:
"""检查今天是否已经推送过"""
record_file = self.get_today_record_file()
if not record_file.exists():
return False
try:
with open(record_file, "r", encoding="utf-8") as f:
record = json.load(f)
return record.get("pushed", False)
except Exception as e:
print(f"读取推送记录失败: {e}")
return False
def record_push(self, report_type: str):
"""记录推送"""
record_file = self.get_today_record_file()
now = get_beijing_time()
record = {
"pushed": True,
"push_time": now.strftime("%Y-%m-%d %H:%M:%S"),
"report_type": report_type,
}
try:
with open(record_file, "w", encoding="utf-8") as f:
json.dump(record, f, ensure_ascii=False, indent=2)
print(f"推送记录已保存: {report_type} at {now.strftime('%H:%M:%S')}")
except Exception as e:
print(f"保存推送记录失败: {e}")
def is_in_time_range(self, start_time: str, end_time: str) -> bool:
"""检查当前时间是否在指定时间范围内"""
now = get_beijing_time()
current_time = now.strftime("%H:%M")
return start_time <= current_time <= end_time
# === 数据获取 ===
class DataFetcher:
"""数据获取器"""
def __init__(self, proxy_url: Optional[str] = None):
self.proxy_url = proxy_url
def fetch_data(
self,
id_info: Union[str, Tuple[str, str]],
max_retries: int = 2,
min_retry_wait: int = 3,
max_retry_wait: int = 5,
) -> Tuple[Optional[str], str, str]:
"""获取指定ID数据,支持重试"""
if isinstance(id_info, tuple):
id_value, alias = id_info
else:
id_value = id_info
alias = id_value
url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest"
proxies = None
if self.proxy_url:
proxies = {"http": self.proxy_url, "https": self.proxy_url}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
}
retries = 0
while retries <= max_retries:
try:
response = requests.get(
url, proxies=proxies, headers=headers, timeout=10
)
response.raise_for_status()
data_text = response.text
data_json = json.loads(data_text)
status = data_json.get("status", "未知")
if status not in ["success", "cache"]:
raise ValueError(f"响应状态异常: {status}")
status_info = "最新数据" if status == "success" else "缓存数据"
print(f"获取 {id_value} 成功({status_info})")
return data_text, id_value, alias
except Exception as e:
retries += 1
if retries <= max_retries:
base_wait = random.uniform(min_retry_wait, max_retry_wait)
additional_wait = (retries - 1) * random.uniform(1, 2)
wait_time = base_wait + additional_wait
print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
time.sleep(wait_time)
else:
print(f"请求 {id_value} 失败: {e}")
return None, id_value, alias
return None, id_value, alias
def crawl_websites(
self,
ids_list: List[Union[str, Tuple[str, str]]],
request_interval: int = CONFIG["REQUEST_INTERVAL"],
) -> Tuple[Dict, Dict, List]:
"""爬取多个网站数据"""
results = {}
id_to_name = {}
failed_ids = []
for i, id_info in enumerate(ids_list):
if isinstance(id_info, tuple):
id_value, name = id_info
else:
id_value = id_info
name = id_value
id_to_name[id_value] = name
response, _, _ = self.fetch_data(id_info)
if response:
try:
data = json.loads(response)
results[id_value] = {}
for index, item in enumerate(data.get("items", []), 1):
title = item["title"]
url = item.get("url", "")
mobile_url = item.get("mobileUrl", "")
if title in results[id_value]:
results[id_value][title]["ranks"].append(index)
else:
results[id_value][title] = {
"ranks": [index],
"url": url,
"mobileUrl": mobile_url,
}
except json.JSONDecodeError:
print(f"解析 {id_value} 响应失败")
failed_ids.append(id_value)
except Exception as e:
print(f"处理 {id_value} 数据出错: {e}")
failed_ids.append(id_value)
else:
failed_ids.append(id_value)
if i < len(ids_list) - 1:
actual_interval = request_interval + random.randint(-10, 20)
actual_interval = max(50, actual_interval)
time.sleep(actual_interval / 1000)
print(f"成功: {list(results.keys())}, 失败: {failed_ids}")
return results, id_to_name, failed_ids
# === 数据处理 ===
def save_titles_to_file(results: Dict, id_to_name: Dict, failed_ids: List) -> str:
"""保存标题到文件"""
file_path = get_output_path("txt", f"{format_time_filename()}.txt")
with open(file_path, "w", encoding="utf-8") as f:
for id_value, title_data in results.items():
# id | name 或 id
name = id_to_name.get(id_value)
if name and name != id_value:
f.write(f"{id_value} | {name}\n")
else:
f.write(f"{id_value}\n")
# 按排名排序标题
sorted_titles = []
for title, info in title_data.items():
cleaned_title = clean_title(title)
if isinstance(info, dict):
ranks = info.get("ranks", [])
url = info.get("url", "")
mobile_url = info.get("mobileUrl", "")
else:
ranks = info if isinstance(info, list) else []
url = ""
mobile_url = ""
rank = ranks[0] if ranks else 1
sorted_titles.append((rank, cleaned_title, url, mobile_url))
sorted_titles.sort(key=lambda x: x[0])
for rank, cleaned_title, url, mobile_url in sorted_titles:
line = f"{rank}. {cleaned_title}"
if url:
line += f" [URL:{url}]"
if mobile_url:
line += f" [MOBILE:{mobile_url}]"
f.write(line + "\n")
f.write("\n")
if failed_ids:
f.write("==== 以下ID请求失败 ====\n")
for id_value in failed_ids:
f.write(f"{id_value}\n")
return file_path
def load_frequency_words(
frequency_file: Optional[str] = None,
) -> Tuple[List[Dict], List[str]]:
"""加载频率词配置"""
if frequency_file is None:
frequency_file = os.environ.get(
"FREQUENCY_WORDS_PATH", "config/frequency_words.txt"
)
frequency_path = Path(frequency_file)
if not frequency_path.exists():
raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在")
with open(frequency_path, "r", encoding="utf-8") as f:
content = f.read()
word_groups = [group.strip() for group in content.split("\n\n") if group.strip()]
processed_groups = []
filter_words = []
for group in word_groups:
words = [word.strip() for word in group.split("\n") if word.strip()]
group_required_words = []
group_normal_words = []
group_filter_words = []
for word in words:
if word.startswith("!"):
filter_words.append(word[1:])
group_filter_words.append(word[1:])
elif word.startswith("+"):
group_required_words.append(word[1:])
else:
group_normal_words.append(word)
if group_required_words or group_normal_words:
if group_normal_words:
group_key = " ".join(group_normal_words)
else:
group_key = " ".join(group_required_words)
processed_groups.append(
{
"required": group_required_words,
"normal": group_normal_words,
"group_key": group_key,
}
)
return processed_groups, filter_words
def parse_file_titles(file_path: Path) -> Tuple[Dict, Dict]:
"""解析单个txt文件的标题数据,返回(titles_by_id, id_to_name)"""
titles_by_id = {}
id_to_name = {}
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
sections = content.split("\n\n")
for section in sections:
if not section.strip() or "==== 以下ID请求失败 ====" in section:
continue
lines = section.strip().split("\n")
if len(lines) < 2:
continue
# id | name 或 id
header_line = lines[0].strip()
if " | " in header_line:
parts = header_line.split(" | ", 1)
source_id = parts[0].strip()
name = parts[1].strip()
id_to_name[source_id] = name
else:
source_id = header_line
id_to_name[source_id] = source_id
titles_by_id[source_id] = {}
for line in lines[1:]:
if line.strip():
try:
title_part = line.strip()
rank = None
# 提取排名
if ". " in title_part and title_part.split(". ")[0].isdigit():
rank_str, title_part = title_part.split(". ", 1)
rank = int(rank_str)
# 提取 MOBILE URL
mobile_url = ""
if " [MOBILE:" in title_part:
title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
if mobile_part.endswith("]"):
mobile_url = mobile_part[:-1]
# 提取 URL
url = ""
if " [URL:" in title_part:
title_part, url_part = title_part.rsplit(" [URL:", 1)
if url_part.endswith("]"):
url = url_part[:-1]
title = clean_title(title_part.strip())
ranks = [rank] if rank is not None else [1]
titles_by_id[source_id][title] = {
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
except Exception as e:
print(f"解析标题行出错: {line}, 错误: {e}")
return titles_by_id, id_to_name
def read_all_today_titles(
current_platform_ids: Optional[List[str]] = None,
) -> Tuple[Dict, Dict, Dict]:
"""读取当天所有标题文件,支持按当前监控平台过滤"""
date_folder = format_date_folder()
txt_dir = Path("output") / date_folder / "txt"
if not txt_dir.exists():
return {}, {}, {}
all_results = {}
final_id_to_name = {}
title_info = {}
files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
for file_path in files:
time_info = file_path.stem
titles_by_id, file_id_to_name = parse_file_titles(file_path)
if current_platform_ids is not None:
filtered_titles_by_id = {}
filtered_id_to_name = {}
for source_id, title_data in titles_by_id.items():
if source_id in current_platform_ids:
filtered_titles_by_id[source_id] = title_data
if source_id in file_id_to_name:
filtered_id_to_name[source_id] = file_id_to_name[source_id]
titles_by_id = filtered_titles_by_id
file_id_to_name = filtered_id_to_name
final_id_to_name.update(file_id_to_name)
for source_id, title_data in titles_by_id.items():
process_source_data(
source_id, title_data, time_info, all_results, title_info
)
return all_results, final_id_to_name, title_info
def process_source_data(
source_id: str,
title_data: Dict,
time_info: str,
all_results: Dict,
title_info: Dict,
) -> None:
"""处理来源数据,合并重复标题"""
if source_id not in all_results:
all_results[source_id] = title_data
if source_id not in title_info:
title_info[source_id] = {}
for title, data in title_data.items():
ranks = data.get("ranks", [])
url = data.get("url", "")
mobile_url = data.get("mobileUrl", "")
title_info[source_id][title] = {
"first_time": time_info,
"last_time": time_info,
"count": 1,
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
else:
for title, data in title_data.items():
ranks = data.get("ranks", [])
url = data.get("url", "")
mobile_url = data.get("mobileUrl", "")
if title not in all_results[source_id]:
all_results[source_id][title] = {
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
title_info[source_id][title] = {
"first_time": time_info,
"last_time": time_info,
"count": 1,
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
else:
existing_data = all_results[source_id][title]
existing_ranks = existing_data.get("ranks", [])
existing_url = existing_data.get("url", "")
existing_mobile_url = existing_data.get("mobileUrl", "")
merged_ranks = existing_ranks.copy()
for rank in ranks:
if rank not in merged_ranks:
merged_ranks.append(rank)
all_results[source_id][title] = {
"ranks": merged_ranks,
"url": existing_url or url,
"mobileUrl": existing_mobile_url or mobile_url,
}
title_info[source_id][title]["last_time"] = time_info
title_info[source_id][title]["ranks"] = merged_ranks
title_info[source_id][title]["count"] += 1
if not title_info[source_id][title].get("url"):
title_info[source_id][title]["url"] = url
if not title_info[source_id][title].get("mobileUrl"):
title_info[source_id][title]["mobileUrl"] = mobile_url
def detect_latest_new_titles(current_platform_ids: Optional[List[str]] = None) -> Dict:
"""检测当日最新批次的新增标题,支持按当前监控平台过滤"""
date_folder = format_date_folder()
txt_dir = Path("output") / date_folder / "txt"
if not txt_dir.exists():
return {}
files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
if len(files) < 2:
return {}
# 解析最新文件
latest_file = files[-1]
latest_titles, _ = parse_file_titles(latest_file)
# 如果指定了当前平台列表,过滤最新文件数据
if current_platform_ids is not None:
filtered_latest_titles = {}
for source_id, title_data in latest_titles.items():
if source_id in current_platform_ids:
filtered_latest_titles[source_id] = title_data
latest_titles = filtered_latest_titles
# 汇总历史标题(按平台过滤)
historical_titles = {}
for file_path in files[:-1]:
historical_data, _ = parse_file_titles(file_path)
# 过滤历史数据
if current_platform_ids is not None:
filtered_historical_data = {}
for source_id, title_data in historical_data.items():
if source_id in current_platform_ids:
filtered_historical_data[source_id] = title_data
historical_data = filtered_historical_data
for source_id, titles_data in historical_data.items():
if source_id not in historical_titles:
historical_titles[source_id] = set()
for title in titles_data.keys():
historical_titles[source_id].add(title)
# 找出新增标题
new_titles = {}
for source_id, latest_source_titles in latest_titles.items():
historical_set = historical_titles.get(source_id, set())
source_new_titles = {}
for title, title_data in latest_source_titles.items():
if title not in historical_set:
source_new_titles[title] = title_data
if source_new_titles:
new_titles[source_id] = source_new_titles
return new_titles
# === 统计和分析 ===
def calculate_news_weight(
title_data: Dict, rank_threshold: int = CONFIG["RANK_THRESHOLD"]
) -> float:
"""计算新闻权重,用于排序"""
ranks = title_data.get("ranks", [])
if not ranks:
return 0.0
count = title_data.get("count", len(ranks))
weight_config = CONFIG["WEIGHT_CONFIG"]
# 排名权重:Σ(11 - min(rank, 10)) / 出现次数
rank_scores = []
for rank in ranks:
score = 11 - min(rank, 10)
rank_scores.append(score)
rank_weight = sum(rank_scores) / len(ranks) if ranks else 0
# 频次权重:min(出现次数, 10) × 10
frequency_weight = min(count, 10) * 10
# 热度加成:高排名次数 / 总出现次数 × 100
high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold)
hotness_ratio = high_rank_count / len(ranks) if ranks else 0
hotness_weight = hotness_ratio * 100
total_weight = (
rank_weight * weight_config["RANK_WEIGHT"]
+ frequency_weight * weight_config["FREQUENCY_WEIGHT"]
+ hotness_weight * weight_config["HOTNESS_WEIGHT"]
)
return total_weight
def matches_word_groups(
title: str, word_groups: List[Dict], filter_words: List[str]
) -> bool:
"""检查标题是否匹配词组规则"""
# 如果没有配置词组,则匹配所有标题(支持显示全部新闻)
if not word_groups:
return True
title_lower = title.lower()
# 过滤词检查
if any(filter_word.lower() in title_lower for filter_word in filter_words):
return False
# 词组匹配检查
for group in word_groups:
required_words = group["required"]
normal_words = group["normal"]
# 必须词检查
if required_words:
all_required_present = all(
req_word.lower() in title_lower for req_word in required_words
)
if not all_required_present:
continue
# 普通词检查
if normal_words:
any_normal_present = any(
normal_word.lower() in title_lower for normal_word in normal_words
)
if not any_normal_present:
continue
return True
return False
def format_time_display(first_time: str, last_time: str) -> str:
"""格式化时间显示"""
if not first_time:
return ""
if first_time == last_time or not last_time:
return first_time
else:
return f"[{first_time} ~ {last_time}]"
def format_rank_display(ranks: List[int], rank_threshold: int, format_type: str) -> str:
"""统一的排名格式化方法"""
if not ranks:
return ""
unique_ranks = sorted(set(ranks))
min_rank = unique_ranks[0]
max_rank = unique_ranks[-1]
if format_type == "html":
highlight_start = ""
highlight_end = ""
elif format_type == "feishu":
highlight_start = "**"
highlight_end = "**"
elif format_type == "dingtalk":
highlight_start = "**"
highlight_end = "**"
elif format_type == "wework":
highlight_start = "**"
highlight_end = "**"
elif format_type == "telegram":
highlight_start = ""
highlight_end = ""
else:
highlight_start = "**"
highlight_end = "**"
if min_rank <= rank_threshold:
if min_rank == max_rank:
return f"{highlight_start}[{min_rank}]{highlight_end}"
else:
return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}"
else:
if min_rank == max_rank:
return f"[{min_rank}]"
else:
return f"[{min_rank} - {max_rank}]"
def count_word_frequency(
results: Dict,
word_groups: List[Dict],
filter_words: List[str],
id_to_name: Dict,
title_info: Optional[Dict] = None,
rank_threshold: int = CONFIG["RANK_THRESHOLD"],
new_titles: Optional[Dict] = None,
mode: str = "daily",
) -> Tuple[List[Dict], int]:
"""统计词频,支持必须词、频率词、过滤词,并标记新增标题"""
# 如果没有配置词组,创建一个包含所有新闻的虚拟词组
if not word_groups:
print("频率词配置为空,将显示所有新闻")
word_groups = [{"required": [], "normal": [], "group_key": "全部新闻"}]
filter_words = [] # 清空过滤词,显示所有新闻
is_first_today = is_first_crawl_today()
# 确定处理的数据源和新增标记逻辑
if mode == "incremental":
if is_first_today:
# 增量模式 + 当天第一次:处理所有新闻,都标记为新增
results_to_process = results
all_news_are_new = True
else:
# 增量模式 + 当天非第一次:只处理新增的新闻
results_to_process = new_titles if new_titles else {}
all_news_are_new = True
elif mode == "current":
# current 模式:只处理当前时间批次的新闻,但统计信息来自全部历史
if title_info:
latest_time = None
for source_titles in title_info.values():
for title_data in source_titles.values():
last_time = title_data.get("last_time", "")
if last_time:
if latest_time is None or last_time > latest_time:
latest_time = last_time
# 只处理 last_time 等于最新时间的新闻
if latest_time:
results_to_process = {}
for source_id, source_titles in results.items():
if source_id in title_info:
filtered_titles = {}
for title, title_data in source_titles.items():
if title in title_info[source_id]:
info = title_info[source_id][title]
if info.get("last_time") == latest_time:
filtered_titles[title] = title_data
if filtered_titles:
results_to_process[source_id] = filtered_titles
print(
f"当前榜单模式:最新时间 {latest_time},筛选出 {sum(len(titles) for titles in results_to_process.values())} 条当前榜单新闻"
)
else:
results_to_process = results
else:
results_to_process = results
all_news_are_new = False
else:
# 当日汇总模式:处理所有新闻
results_to_process = results
all_news_are_new = False
total_input_news = sum(len(titles) for titles in results.values())
filter_status = (
"全部显示"
if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
else "频率词过滤"
)
print(f"当日汇总模式:处理 {total_input_news} 条新闻,模式:{filter_status}")
word_stats = {}
total_titles = 0
processed_titles = {}
matched_new_count = 0
if title_info is None:
title_info = {}
if new_titles is None:
new_titles = {}
for group in word_groups:
group_key = group["group_key"]
word_stats[group_key] = {"count": 0, "titles": {}}
for source_id, titles_data in results_to_process.items():
total_titles += len(titles_data)
if source_id not in processed_titles:
processed_titles[source_id] = {}
for title, title_data in titles_data.items():
if title in processed_titles.get(source_id, {}):
continue
# 使用统一的匹配逻辑
matches_frequency_words = matches_word_groups(
title, word_groups, filter_words
)
if not matches_frequency_words:
continue
# 如果是增量模式或 current 模式第一次,统计匹配的新增新闻数量
if (mode == "incremental" and all_news_are_new) or (
mode == "current" and is_first_today
):
matched_new_count += 1
source_ranks = title_data.get("ranks", [])
source_url = title_data.get("url", "")
source_mobile_url = title_data.get("mobileUrl", "")
# 找到匹配的词组
title_lower = title.lower()
for group in word_groups:
required_words = group["required"]
normal_words = group["normal"]
# 如果是"全部新闻"模式,所有标题都匹配第一个(唯一的)词组
if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻":
group_key = group["group_key"]
word_stats[group_key]["count"] += 1
if source_id not in word_stats[group_key]["titles"]:
word_stats[group_key]["titles"][source_id] = []
else:
# 原有的匹配逻辑
if required_words:
all_required_present = all(
req_word.lower() in title_lower
for req_word in required_words
)
if not all_required_present:
continue
if normal_words:
any_normal_present = any(
normal_word.lower() in title_lower
for normal_word in normal_words
)
if not any_normal_present:
continue
group_key = group["group_key"]
word_stats[group_key]["count"] += 1
if source_id not in word_stats[group_key]["titles"]:
word_stats[group_key]["titles"][source_id] = []
first_time = ""
last_time = ""
count_info = 1
ranks = source_ranks if source_ranks else []
url = source_url
mobile_url = source_mobile_url
# 对于 current 模式,从历史统计信息中获取完整数据
if (
mode == "current"
and title_info
and source_id in title_info
and title in title_info[source_id]
):
info = title_info[source_id][title]
first_time = info.get("first_time", "")
last_time = info.get("last_time", "")
count_info = info.get("count", 1)
if "ranks" in info and info["ranks"]:
ranks = info["ranks"]
url = info.get("url", source_url)
mobile_url = info.get("mobileUrl", source_mobile_url)
elif (
title_info
and source_id in title_info
and title in title_info[source_id]
):
info = title_info[source_id][title]
first_time = info.get("first_time", "")
last_time = info.get("last_time", "")
count_info = info.get("count", 1)
if "ranks" in info and info["ranks"]:
ranks = info["ranks"]
url = info.get("url", source_url)
mobile_url = info.get("mobileUrl", source_mobile_url)
if not ranks:
ranks = [99]
time_display = format_time_display(first_time, last_time)
source_name = id_to_name.get(source_id, source_id)
# 判断是否为新增
is_new = False
if all_news_are_new:
# 增量模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增
is_new = True
elif new_titles and source_id in new_titles:
# 检查是否在新增列表中
new_titles_for_source = new_titles[source_id]
is_new = title in new_titles_for_source
word_stats[group_key]["titles"][source_id].append(
{
"title": title,
"source_name": source_name,
"first_time": first_time,
"last_time": last_time,
"time_display": time_display,
"count": count_info,
"ranks": ranks,
"rank_threshold": rank_threshold,
"url": url,
"mobileUrl": mobile_url,
"is_new": is_new,
}
)
if source_id not in processed_titles:
processed_titles[source_id] = {}
processed_titles[source_id][title] = True
break
# 最后统一打印汇总信息
if mode == "incremental":
if is_first_today:
total_input_news = sum(len(titles) for titles in results.values())
filter_status = (
"全部显示"
if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
else "频率词匹配"
)
print(
f"增量模式:当天第一次爬取,{total_input_news} 条新闻中有 {matched_new_count} 条{filter_status}"
)
else:
if new_titles:
total_new_count = sum(len(titles) for titles in new_titles.values())
filter_status = (
"全部显示"
if len(word_groups) == 1
and word_groups[0]["group_key"] == "全部新闻"
else "匹配频率词"
)
print(
f"增量模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条{filter_status}"
)
if matched_new_count == 0 and len(word_groups) > 1:
print("增量模式:没有新增新闻匹配频率词,将不会发送通知")
else:
print("增量模式:未检测到新增新闻")
elif mode == "current":
total_input_news = sum(len(titles) for titles in results_to_process.values())
if is_first_today:
filter_status = (
"全部显示"
if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
else "频率词匹配"
)
print(
f"当前榜单模式:当天第一次爬取,{total_input_news} 条当前榜单新闻中有 {matched_new_count} 条{filter_status}"
)
else:
matched_count = sum(stat["count"] for stat in word_stats.values())
filter_status = (
"全部显示"
if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
else "频率词匹配"
)
print(
f"当前榜单模式:{total_input_news} 条当前榜单新闻中有 {matched_count} 条{filter_status}"
)
stats = []
for group_key, data in word_stats.items():
all_titles = []
for source_id, title_list in data["titles"].items():
all_titles.extend(title_list)
# 按权重排序
sorted_titles = sorted(
all_titles,
key=lambda x: (
-calculate_news_weight(x, rank_threshold),
min(x["ranks"]) if x["ranks"] else 999,
-x["count"],
),
)
stats.append(
{
"word": group_key,
"count": data["count"],
"titles": sorted_titles,
"percentage": (
round(data["count"] / total_titles * 100, 2)
if total_titles > 0
else 0
),
}
)
stats.sort(key=lambda x: x["count"], reverse=True)
return stats, total_titles
# === 报告生成 ===
def prepare_report_data(
stats: List[Dict],
failed_ids: Optional[List] = None,
new_titles: Optional[Dict] = None,
id_to_name: Optional[Dict] = None,
mode: str = "daily",
) -> Dict:
"""准备报告数据"""
processed_new_titles = []
# 在增量模式下隐藏新增新闻区域
hide_new_section = mode == "incremental"
# 只有在非隐藏模式下才处理新增新闻部分
if not hide_new_section:
filtered_new_titles = {}
if new_titles and id_to_name:
word_groups, filter_words = load_frequency_words()
for source_id, titles_data in new_titles.items():
filtered_titles = {}
for title, title_data in titles_data.items():
if matches_word_groups(title, word_groups, filter_words):
filtered_titles[title] = title_data
if filtered_titles:
filtered_new_titles[source_id] = filtered_titles
if filtered_new_titles and id_to_name:
for source_id, titles_data in filtered_new_titles.items():
source_name = id_to_name.get(source_id, source_id)
source_titles = []
for title, title_data in titles_data.items():
url = title_data.get("url", "")
mobile_url = title_data.get("mobileUrl", "")
ranks = title_data.get("ranks", [])
processed_title = {
"title": title,
"source_name": source_name,
"time_display": "",
"count": 1,
"ranks": ranks,
"rank_threshold": CONFIG["RANK_THRESHOLD"],
"url": url,
"mobile_url": mobile_url,
"is_new": True,
}
source_titles.append(processed_title)
if source_titles:
processed_new_titles.append(
{
"source_id": source_id,
"source_name": source_name,
"titles": source_titles,
}
)
processed_stats = []
for stat in stats:
if stat["count"] <= 0:
continue
processed_titles = []
for title_data in stat["titles"]:
processed_title = {
"title": title_data["title"],
"source_name": title_data["source_name"],
"time_display": title_data["time_display"],
"count": title_data["count"],
"ranks": title_data["ranks"],
"rank_threshold": title_data["rank_threshold"],
"url": title_data.get("url", ""),
"mobile_url": title_data.get("mobileUrl", ""),
"is_new": title_data.get("is_new", False),
}
processed_titles.append(processed_title)
processed_stats.append(
{
"word": stat["word"],
"count": stat["count"],
"percentage": stat.get("percentage", 0),
"titles": processed_titles,
}
)
return {
"stats": processed_stats,
"new_titles": processed_new_titles,
"failed_ids": failed_ids or [],
"total_new_count": sum(
len(source["titles"]) for source in processed_new_titles
),
}
def format_title_for_platform(
platform: str, title_data: Dict, show_source: bool = True
) -> str:
"""统一的标题格式化方法"""
rank_display = format_rank_display(
title_data["ranks"], title_data["rank_threshold"], platform
)
link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = clean_title(title_data["title"])
if platform == "feishu":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform == "dingtalk":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform == "wework":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform == "telegram":
if link_url:
formatted_title = f'{html_escape(cleaned_title)}'
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform == "ntfy":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" `- {title_data['time_display']}`"
if title_data["count"] > 1:
result += f" `({title_data['count']}次)`"
return result
elif platform == "html":
rank_display = format_rank_display(
title_data["ranks"], title_data["rank_threshold"], "html"
)
link_url = title_data["mobile_url"] or title_data["url"]
escaped_title = html_escape(cleaned_title)
escaped_source_name = html_escape(title_data["source_name"])
if link_url:
escaped_url = html_escape(link_url)
formatted_title = f'[{escaped_source_name}] {escaped_title}'
else:
formatted_title = (
f'[{escaped_source_name}] {escaped_title}'
)
if rank_display:
formatted_title += f" {rank_display}"
if title_data["time_display"]:
escaped_time = html_escape(title_data["time_display"])
formatted_title += f" - {escaped_time}"
if title_data["count"] > 1:
formatted_title += f" ({title_data['count']}次)"
if title_data.get("is_new"):
formatted_title = f"