# coding=utf-8 import json import os import random import re import time import webbrowser import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header from email.utils import formataddr, formatdate, make_msgid from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple, Optional, Union import pytz import requests import yaml VERSION = "2.4.3" # === SMTP邮件配置 === SMTP_CONFIGS = { # Gmail "gmail.com": {"server": "smtp.gmail.com", "port": 587, "encryption": "TLS"}, # QQ邮箱 "qq.com": {"server": "smtp.qq.com", "port": 587, "encryption": "TLS"}, # Outlook "outlook.com": { "server": "smtp-mail.outlook.com", "port": 587, "encryption": "TLS", }, "hotmail.com": { "server": "smtp-mail.outlook.com", "port": 587, "encryption": "TLS", }, "live.com": {"server": "smtp-mail.outlook.com", "port": 587, "encryption": "TLS"}, # 网易邮箱 "163.com": {"server": "smtp.163.com", "port": 587, "encryption": "TLS"}, "126.com": {"server": "smtp.126.com", "port": 587, "encryption": "TLS"}, # 新浪邮箱 "sina.com": {"server": "smtp.sina.com", "port": 587, "encryption": "TLS"}, # 搜狐邮箱 "sohu.com": {"server": "smtp.sohu.com", "port": 587, "encryption": "TLS"}, } # === 配置管理 === def load_config(): """加载配置文件""" config_path = os.environ.get("CONFIG_PATH", "config/config.yaml") if not Path(config_path).exists(): raise FileNotFoundError(f"配置文件 {config_path} 不存在") with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) print(f"配置文件加载成功: {config_path}") # 构建配置 config = { "VERSION_CHECK_URL": config_data["app"]["version_check_url"], "SHOW_VERSION_UPDATE": config_data["app"]["show_version_update"], "REQUEST_INTERVAL": config_data["crawler"]["request_interval"], "REPORT_MODE": config_data["report"]["mode"], "RANK_THRESHOLD": config_data["report"]["rank_threshold"], "USE_PROXY": config_data["crawler"]["use_proxy"], "DEFAULT_PROXY": config_data["crawler"]["default_proxy"], "ENABLE_CRAWLER": config_data["crawler"]["enable_crawler"], "ENABLE_NOTIFICATION": config_data["notification"]["enable_notification"], "MESSAGE_BATCH_SIZE": config_data["notification"]["message_batch_size"], "DINGTALK_BATCH_SIZE": config_data["notification"].get( "dingtalk_batch_size", 20000 ), "BATCH_SEND_INTERVAL": config_data["notification"]["batch_send_interval"], "FEISHU_MESSAGE_SEPARATOR": config_data["notification"][ "feishu_message_separator" ], "PUSH_WINDOW": { "ENABLED": config_data["notification"] .get("push_window", {}) .get("enabled", False), "TIME_RANGE": { "START": config_data["notification"] .get("push_window", {}) .get("time_range", {}) .get("start", "08:00"), "END": config_data["notification"] .get("push_window", {}) .get("time_range", {}) .get("end", "22:00"), }, "ONCE_PER_DAY": config_data["notification"] .get("push_window", {}) .get("once_per_day", True), "RECORD_RETENTION_DAYS": config_data["notification"] .get("push_window", {}) .get("push_record_retention_days", 7), }, "WEIGHT_CONFIG": { "RANK_WEIGHT": config_data["weight"]["rank_weight"], "FREQUENCY_WEIGHT": config_data["weight"]["frequency_weight"], "HOTNESS_WEIGHT": config_data["weight"]["hotness_weight"], }, "PLATFORMS": config_data["platforms"], } # 通知渠道配置(环境变量优先) notification = config_data.get("notification", {}) webhooks = notification.get("webhooks", {}) config["FEISHU_WEBHOOK_URL"] = os.environ.get( "FEISHU_WEBHOOK_URL", "" ).strip() or webhooks.get("feishu_url", "") config["DINGTALK_WEBHOOK_URL"] = os.environ.get( "DINGTALK_WEBHOOK_URL", "" ).strip() or webhooks.get("dingtalk_url", "") config["WEWORK_WEBHOOK_URL"] = os.environ.get( "WEWORK_WEBHOOK_URL", "" ).strip() or webhooks.get("wework_url", "") config["TELEGRAM_BOT_TOKEN"] = os.environ.get( "TELEGRAM_BOT_TOKEN", "" ).strip() or webhooks.get("telegram_bot_token", "") config["TELEGRAM_CHAT_ID"] = os.environ.get( "TELEGRAM_CHAT_ID", "" ).strip() or webhooks.get("telegram_chat_id", "") # 邮件配置 config["EMAIL_FROM"] = os.environ.get("EMAIL_FROM", "").strip() or webhooks.get( "email_from", "" ) config["EMAIL_PASSWORD"] = os.environ.get( "EMAIL_PASSWORD", "" ).strip() or webhooks.get("email_password", "") config["EMAIL_TO"] = os.environ.get("EMAIL_TO", "").strip() or webhooks.get( "email_to", "" ) config["EMAIL_SMTP_SERVER"] = os.environ.get( "EMAIL_SMTP_SERVER", "" ).strip() or webhooks.get("email_smtp_server", "") config["EMAIL_SMTP_PORT"] = os.environ.get( "EMAIL_SMTP_PORT", "" ).strip() or webhooks.get("email_smtp_port", "") # ntfy配置 config["NTFY_SERVER_URL"] = os.environ.get( "NTFY_SERVER_URL", "https://ntfy.sh" ).strip() or webhooks.get("ntfy_server_url", "https://ntfy.sh") config["NTFY_TOPIC"] = os.environ.get("NTFY_TOPIC", "").strip() or webhooks.get( "ntfy_topic", "" ) config["NTFY_TOKEN"] = os.environ.get("NTFY_TOKEN", "").strip() or webhooks.get( "ntfy_token", "" ) # 输出配置来源信息 notification_sources = [] if config["FEISHU_WEBHOOK_URL"]: source = "环境变量" if os.environ.get("FEISHU_WEBHOOK_URL") else "配置文件" notification_sources.append(f"飞书({source})") if config["DINGTALK_WEBHOOK_URL"]: source = "环境变量" if os.environ.get("DINGTALK_WEBHOOK_URL") else "配置文件" notification_sources.append(f"钉钉({source})") if config["WEWORK_WEBHOOK_URL"]: source = "环境变量" if os.environ.get("WEWORK_WEBHOOK_URL") else "配置文件" notification_sources.append(f"企业微信({source})") if config["TELEGRAM_BOT_TOKEN"] and config["TELEGRAM_CHAT_ID"]: token_source = ( "环境变量" if os.environ.get("TELEGRAM_BOT_TOKEN") else "配置文件" ) chat_source = "环境变量" if os.environ.get("TELEGRAM_CHAT_ID") else "配置文件" notification_sources.append(f"Telegram({token_source}/{chat_source})") if config["EMAIL_FROM"] and config["EMAIL_PASSWORD"] and config["EMAIL_TO"]: from_source = "环境变量" if os.environ.get("EMAIL_FROM") else "配置文件" notification_sources.append(f"邮件({from_source})") if config["NTFY_SERVER_URL"] and config["NTFY_TOPIC"]: server_source = "环境变量" if os.environ.get("NTFY_SERVER_URL") else "配置文件" notification_sources.append(f"ntfy({server_source})") if notification_sources: print(f"通知渠道配置来源: {', '.join(notification_sources)}") else: print("未配置任何通知渠道") return config print("正在加载配置...") CONFIG = load_config() print(f"TrendRadar v{VERSION} 配置加载完成") print(f"监控平台数量: {len(CONFIG['PLATFORMS'])}") # === 工具函数 === def get_beijing_time(): """获取北京时间""" return datetime.now(pytz.timezone("Asia/Shanghai")) def format_date_folder(): """格式化日期文件夹""" return get_beijing_time().strftime("%Y年%m月%d日") def format_time_filename(): """格式化时间文件名""" return get_beijing_time().strftime("%H时%M分") def clean_title(title: str) -> str: """清理标题中的特殊字符""" if not isinstance(title, str): title = str(title) cleaned_title = title.replace("\n", " ").replace("\r", " ") cleaned_title = re.sub(r"\s+", " ", cleaned_title) cleaned_title = cleaned_title.strip() return cleaned_title def ensure_directory_exists(directory: str): """确保目录存在""" Path(directory).mkdir(parents=True, exist_ok=True) def get_output_path(subfolder: str, filename: str) -> str: """获取输出路径""" date_folder = format_date_folder() output_dir = Path("output") / date_folder / subfolder ensure_directory_exists(str(output_dir)) return str(output_dir / filename) def check_version_update( current_version: str, version_url: str, proxy_url: Optional[str] = None ) -> Tuple[bool, Optional[str]]: """检查版本更新""" try: proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/plain, */*", "Cache-Control": "no-cache", } response = requests.get( version_url, proxies=proxies, headers=headers, timeout=10 ) response.raise_for_status() remote_version = response.text.strip() print(f"当前版本: {current_version}, 远程版本: {remote_version}") # 比较版本 def parse_version(version_str): try: parts = version_str.strip().split(".") if len(parts) != 3: raise ValueError("版本号格式不正确") return int(parts[0]), int(parts[1]), int(parts[2]) except: return 0, 0, 0 current_tuple = parse_version(current_version) remote_tuple = parse_version(remote_version) need_update = current_tuple < remote_tuple return need_update, remote_version if need_update else None except Exception as e: print(f"版本检查失败: {e}") return False, None def is_first_crawl_today() -> bool: """检测是否是当天第一次爬取""" date_folder = format_date_folder() txt_dir = Path("output") / date_folder / "txt" if not txt_dir.exists(): return True files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) return len(files) <= 1 def html_escape(text: str) -> str: """HTML转义""" if not isinstance(text, str): text = str(text) return ( text.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) # === 推送记录管理 === class PushRecordManager: """推送记录管理器""" def __init__(self): self.record_dir = Path("output") / ".push_records" self.ensure_record_dir() self.cleanup_old_records() def ensure_record_dir(self): """确保记录目录存在""" self.record_dir.mkdir(parents=True, exist_ok=True) def get_today_record_file(self) -> Path: """获取今天的记录文件路径""" today = get_beijing_time().strftime("%Y%m%d") return self.record_dir / f"push_record_{today}.json" def cleanup_old_records(self): """清理过期的推送记录""" retention_days = CONFIG["PUSH_WINDOW"]["RECORD_RETENTION_DAYS"] current_time = get_beijing_time() for record_file in self.record_dir.glob("push_record_*.json"): try: date_str = record_file.stem.replace("push_record_", "") file_date = datetime.strptime(date_str, "%Y%m%d") file_date = pytz.timezone("Asia/Shanghai").localize(file_date) if (current_time - file_date).days > retention_days: record_file.unlink() print(f"清理过期推送记录: {record_file.name}") except Exception as e: print(f"清理记录文件失败 {record_file}: {e}") def has_pushed_today(self) -> bool: """检查今天是否已经推送过""" record_file = self.get_today_record_file() if not record_file.exists(): return False try: with open(record_file, "r", encoding="utf-8") as f: record = json.load(f) return record.get("pushed", False) except Exception as e: print(f"读取推送记录失败: {e}") return False def record_push(self, report_type: str): """记录推送""" record_file = self.get_today_record_file() now = get_beijing_time() record = { "pushed": True, "push_time": now.strftime("%Y-%m-%d %H:%M:%S"), "report_type": report_type, } try: with open(record_file, "w", encoding="utf-8") as f: json.dump(record, f, ensure_ascii=False, indent=2) print(f"推送记录已保存: {report_type} at {now.strftime('%H:%M:%S')}") except Exception as e: print(f"保存推送记录失败: {e}") def is_in_time_range(self, start_time: str, end_time: str) -> bool: """检查当前时间是否在指定时间范围内""" now = get_beijing_time() current_time = now.strftime("%H:%M") return start_time <= current_time <= end_time # === 数据获取 === class DataFetcher: """数据获取器""" def __init__(self, proxy_url: Optional[str] = None): self.proxy_url = proxy_url def fetch_data( self, id_info: Union[str, Tuple[str, str]], max_retries: int = 2, min_retry_wait: int = 3, max_retry_wait: int = 5, ) -> Tuple[Optional[str], str, str]: """获取指定ID数据,支持重试""" if isinstance(id_info, tuple): id_value, alias = id_info else: id_value = id_info alias = id_value url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest" proxies = None if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Connection": "keep-alive", "Cache-Control": "no-cache", } retries = 0 while retries <= max_retries: try: response = requests.get( url, proxies=proxies, headers=headers, timeout=10 ) response.raise_for_status() data_text = response.text data_json = json.loads(data_text) status = data_json.get("status", "未知") if status not in ["success", "cache"]: raise ValueError(f"响应状态异常: {status}") status_info = "最新数据" if status == "success" else "缓存数据" print(f"获取 {id_value} 成功({status_info})") return data_text, id_value, alias except Exception as e: retries += 1 if retries <= max_retries: base_wait = random.uniform(min_retry_wait, max_retry_wait) additional_wait = (retries - 1) * random.uniform(1, 2) wait_time = base_wait + additional_wait print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...") time.sleep(wait_time) else: print(f"请求 {id_value} 失败: {e}") return None, id_value, alias return None, id_value, alias def crawl_websites( self, ids_list: List[Union[str, Tuple[str, str]]], request_interval: int = CONFIG["REQUEST_INTERVAL"], ) -> Tuple[Dict, Dict, List]: """爬取多个网站数据""" results = {} id_to_name = {} failed_ids = [] for i, id_info in enumerate(ids_list): if isinstance(id_info, tuple): id_value, name = id_info else: id_value = id_info name = id_value id_to_name[id_value] = name response, _, _ = self.fetch_data(id_info) if response: try: data = json.loads(response) results[id_value] = {} for index, item in enumerate(data.get("items", []), 1): title = item["title"] url = item.get("url", "") mobile_url = item.get("mobileUrl", "") if title in results[id_value]: results[id_value][title]["ranks"].append(index) else: results[id_value][title] = { "ranks": [index], "url": url, "mobileUrl": mobile_url, } except json.JSONDecodeError: print(f"解析 {id_value} 响应失败") failed_ids.append(id_value) except Exception as e: print(f"处理 {id_value} 数据出错: {e}") failed_ids.append(id_value) else: failed_ids.append(id_value) if i < len(ids_list) - 1: actual_interval = request_interval + random.randint(-10, 20) actual_interval = max(50, actual_interval) time.sleep(actual_interval / 1000) print(f"成功: {list(results.keys())}, 失败: {failed_ids}") return results, id_to_name, failed_ids # === 数据处理 === def save_titles_to_file(results: Dict, id_to_name: Dict, failed_ids: List) -> str: """保存标题到文件""" file_path = get_output_path("txt", f"{format_time_filename()}.txt") with open(file_path, "w", encoding="utf-8") as f: for id_value, title_data in results.items(): # id | name 或 id name = id_to_name.get(id_value) if name and name != id_value: f.write(f"{id_value} | {name}\n") else: f.write(f"{id_value}\n") # 按排名排序标题 sorted_titles = [] for title, info in title_data.items(): cleaned_title = clean_title(title) if isinstance(info, dict): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") else: ranks = info if isinstance(info, list) else [] url = "" mobile_url = "" rank = ranks[0] if ranks else 1 sorted_titles.append((rank, cleaned_title, url, mobile_url)) sorted_titles.sort(key=lambda x: x[0]) for rank, cleaned_title, url, mobile_url in sorted_titles: line = f"{rank}. {cleaned_title}" if url: line += f" [URL:{url}]" if mobile_url: line += f" [MOBILE:{mobile_url}]" f.write(line + "\n") f.write("\n") if failed_ids: f.write("==== 以下ID请求失败 ====\n") for id_value in failed_ids: f.write(f"{id_value}\n") return file_path def load_frequency_words( frequency_file: Optional[str] = None, ) -> Tuple[List[Dict], List[str]]: """加载频率词配置""" if frequency_file is None: frequency_file = os.environ.get( "FREQUENCY_WORDS_PATH", "config/frequency_words.txt" ) frequency_path = Path(frequency_file) if not frequency_path.exists(): raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在") with open(frequency_path, "r", encoding="utf-8") as f: content = f.read() word_groups = [group.strip() for group in content.split("\n\n") if group.strip()] processed_groups = [] filter_words = [] for group in word_groups: words = [word.strip() for word in group.split("\n") if word.strip()] group_required_words = [] group_normal_words = [] group_filter_words = [] for word in words: if word.startswith("!"): filter_words.append(word[1:]) group_filter_words.append(word[1:]) elif word.startswith("+"): group_required_words.append(word[1:]) else: group_normal_words.append(word) if group_required_words or group_normal_words: if group_normal_words: group_key = " ".join(group_normal_words) else: group_key = " ".join(group_required_words) processed_groups.append( { "required": group_required_words, "normal": group_normal_words, "group_key": group_key, } ) return processed_groups, filter_words def parse_file_titles(file_path: Path) -> Tuple[Dict, Dict]: """解析单个txt文件的标题数据,返回(titles_by_id, id_to_name)""" titles_by_id = {} id_to_name = {} with open(file_path, "r", encoding="utf-8") as f: content = f.read() sections = content.split("\n\n") for section in sections: if not section.strip() or "==== 以下ID请求失败 ====" in section: continue lines = section.strip().split("\n") if len(lines) < 2: continue # id | name 或 id header_line = lines[0].strip() if " | " in header_line: parts = header_line.split(" | ", 1) source_id = parts[0].strip() name = parts[1].strip() id_to_name[source_id] = name else: source_id = header_line id_to_name[source_id] = source_id titles_by_id[source_id] = {} for line in lines[1:]: if line.strip(): try: title_part = line.strip() rank = None # 提取排名 if ". " in title_part and title_part.split(". ")[0].isdigit(): rank_str, title_part = title_part.split(". ", 1) rank = int(rank_str) # 提取 MOBILE URL mobile_url = "" if " [MOBILE:" in title_part: title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1) if mobile_part.endswith("]"): mobile_url = mobile_part[:-1] # 提取 URL url = "" if " [URL:" in title_part: title_part, url_part = title_part.rsplit(" [URL:", 1) if url_part.endswith("]"): url = url_part[:-1] title = clean_title(title_part.strip()) ranks = [rank] if rank is not None else [1] titles_by_id[source_id][title] = { "ranks": ranks, "url": url, "mobileUrl": mobile_url, } except Exception as e: print(f"解析标题行出错: {line}, 错误: {e}") return titles_by_id, id_to_name def read_all_today_titles( current_platform_ids: Optional[List[str]] = None, ) -> Tuple[Dict, Dict, Dict]: """读取当天所有标题文件,支持按当前监控平台过滤""" date_folder = format_date_folder() txt_dir = Path("output") / date_folder / "txt" if not txt_dir.exists(): return {}, {}, {} all_results = {} final_id_to_name = {} title_info = {} files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) for file_path in files: time_info = file_path.stem titles_by_id, file_id_to_name = parse_file_titles(file_path) if current_platform_ids is not None: filtered_titles_by_id = {} filtered_id_to_name = {} for source_id, title_data in titles_by_id.items(): if source_id in current_platform_ids: filtered_titles_by_id[source_id] = title_data if source_id in file_id_to_name: filtered_id_to_name[source_id] = file_id_to_name[source_id] titles_by_id = filtered_titles_by_id file_id_to_name = filtered_id_to_name final_id_to_name.update(file_id_to_name) for source_id, title_data in titles_by_id.items(): process_source_data( source_id, title_data, time_info, all_results, title_info ) return all_results, final_id_to_name, title_info def process_source_data( source_id: str, title_data: Dict, time_info: str, all_results: Dict, title_info: Dict, ) -> None: """处理来源数据,合并重复标题""" if source_id not in all_results: all_results[source_id] = title_data if source_id not in title_info: title_info[source_id] = {} for title, data in title_data.items(): ranks = data.get("ranks", []) url = data.get("url", "") mobile_url = data.get("mobileUrl", "") title_info[source_id][title] = { "first_time": time_info, "last_time": time_info, "count": 1, "ranks": ranks, "url": url, "mobileUrl": mobile_url, } else: for title, data in title_data.items(): ranks = data.get("ranks", []) url = data.get("url", "") mobile_url = data.get("mobileUrl", "") if title not in all_results[source_id]: all_results[source_id][title] = { "ranks": ranks, "url": url, "mobileUrl": mobile_url, } title_info[source_id][title] = { "first_time": time_info, "last_time": time_info, "count": 1, "ranks": ranks, "url": url, "mobileUrl": mobile_url, } else: existing_data = all_results[source_id][title] existing_ranks = existing_data.get("ranks", []) existing_url = existing_data.get("url", "") existing_mobile_url = existing_data.get("mobileUrl", "") merged_ranks = existing_ranks.copy() for rank in ranks: if rank not in merged_ranks: merged_ranks.append(rank) all_results[source_id][title] = { "ranks": merged_ranks, "url": existing_url or url, "mobileUrl": existing_mobile_url or mobile_url, } title_info[source_id][title]["last_time"] = time_info title_info[source_id][title]["ranks"] = merged_ranks title_info[source_id][title]["count"] += 1 if not title_info[source_id][title].get("url"): title_info[source_id][title]["url"] = url if not title_info[source_id][title].get("mobileUrl"): title_info[source_id][title]["mobileUrl"] = mobile_url def detect_latest_new_titles(current_platform_ids: Optional[List[str]] = None) -> Dict: """检测当日最新批次的新增标题,支持按当前监控平台过滤""" date_folder = format_date_folder() txt_dir = Path("output") / date_folder / "txt" if not txt_dir.exists(): return {} files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"]) if len(files) < 2: return {} # 解析最新文件 latest_file = files[-1] latest_titles, _ = parse_file_titles(latest_file) # 如果指定了当前平台列表,过滤最新文件数据 if current_platform_ids is not None: filtered_latest_titles = {} for source_id, title_data in latest_titles.items(): if source_id in current_platform_ids: filtered_latest_titles[source_id] = title_data latest_titles = filtered_latest_titles # 汇总历史标题(按平台过滤) historical_titles = {} for file_path in files[:-1]: historical_data, _ = parse_file_titles(file_path) # 过滤历史数据 if current_platform_ids is not None: filtered_historical_data = {} for source_id, title_data in historical_data.items(): if source_id in current_platform_ids: filtered_historical_data[source_id] = title_data historical_data = filtered_historical_data for source_id, titles_data in historical_data.items(): if source_id not in historical_titles: historical_titles[source_id] = set() for title in titles_data.keys(): historical_titles[source_id].add(title) # 找出新增标题 new_titles = {} for source_id, latest_source_titles in latest_titles.items(): historical_set = historical_titles.get(source_id, set()) source_new_titles = {} for title, title_data in latest_source_titles.items(): if title not in historical_set: source_new_titles[title] = title_data if source_new_titles: new_titles[source_id] = source_new_titles return new_titles # === 统计和分析 === def calculate_news_weight( title_data: Dict, rank_threshold: int = CONFIG["RANK_THRESHOLD"] ) -> float: """计算新闻权重,用于排序""" ranks = title_data.get("ranks", []) if not ranks: return 0.0 count = title_data.get("count", len(ranks)) weight_config = CONFIG["WEIGHT_CONFIG"] # 排名权重:Σ(11 - min(rank, 10)) / 出现次数 rank_scores = [] for rank in ranks: score = 11 - min(rank, 10) rank_scores.append(score) rank_weight = sum(rank_scores) / len(ranks) if ranks else 0 # 频次权重:min(出现次数, 10) × 10 frequency_weight = min(count, 10) * 10 # 热度加成:高排名次数 / 总出现次数 × 100 high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold) hotness_ratio = high_rank_count / len(ranks) if ranks else 0 hotness_weight = hotness_ratio * 100 total_weight = ( rank_weight * weight_config["RANK_WEIGHT"] + frequency_weight * weight_config["FREQUENCY_WEIGHT"] + hotness_weight * weight_config["HOTNESS_WEIGHT"] ) return total_weight def matches_word_groups( title: str, word_groups: List[Dict], filter_words: List[str] ) -> bool: """检查标题是否匹配词组规则""" # 如果没有配置词组,则匹配所有标题(支持显示全部新闻) if not word_groups: return True title_lower = title.lower() # 过滤词检查 if any(filter_word.lower() in title_lower for filter_word in filter_words): return False # 词组匹配检查 for group in word_groups: required_words = group["required"] normal_words = group["normal"] # 必须词检查 if required_words: all_required_present = all( req_word.lower() in title_lower for req_word in required_words ) if not all_required_present: continue # 普通词检查 if normal_words: any_normal_present = any( normal_word.lower() in title_lower for normal_word in normal_words ) if not any_normal_present: continue return True return False def format_time_display(first_time: str, last_time: str) -> str: """格式化时间显示""" if not first_time: return "" if first_time == last_time or not last_time: return first_time else: return f"[{first_time} ~ {last_time}]" def format_rank_display(ranks: List[int], rank_threshold: int, format_type: str) -> str: """统一的排名格式化方法""" if not ranks: return "" unique_ranks = sorted(set(ranks)) min_rank = unique_ranks[0] max_rank = unique_ranks[-1] if format_type == "html": highlight_start = "" highlight_end = "" elif format_type == "feishu": highlight_start = "**" highlight_end = "**" elif format_type == "dingtalk": highlight_start = "**" highlight_end = "**" elif format_type == "wework": highlight_start = "**" highlight_end = "**" elif format_type == "telegram": highlight_start = "" highlight_end = "" else: highlight_start = "**" highlight_end = "**" if min_rank <= rank_threshold: if min_rank == max_rank: return f"{highlight_start}[{min_rank}]{highlight_end}" else: return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}" else: if min_rank == max_rank: return f"[{min_rank}]" else: return f"[{min_rank} - {max_rank}]" def count_word_frequency( results: Dict, word_groups: List[Dict], filter_words: List[str], id_to_name: Dict, title_info: Optional[Dict] = None, rank_threshold: int = CONFIG["RANK_THRESHOLD"], new_titles: Optional[Dict] = None, mode: str = "daily", ) -> Tuple[List[Dict], int]: """统计词频,支持必须词、频率词、过滤词,并标记新增标题""" # 如果没有配置词组,创建一个包含所有新闻的虚拟词组 if not word_groups: print("频率词配置为空,将显示所有新闻") word_groups = [{"required": [], "normal": [], "group_key": "全部新闻"}] filter_words = [] # 清空过滤词,显示所有新闻 is_first_today = is_first_crawl_today() # 确定处理的数据源和新增标记逻辑 if mode == "incremental": if is_first_today: # 增量模式 + 当天第一次:处理所有新闻,都标记为新增 results_to_process = results all_news_are_new = True else: # 增量模式 + 当天非第一次:只处理新增的新闻 results_to_process = new_titles if new_titles else {} all_news_are_new = True elif mode == "current": # current 模式:只处理当前时间批次的新闻,但统计信息来自全部历史 if title_info: latest_time = None for source_titles in title_info.values(): for title_data in source_titles.values(): last_time = title_data.get("last_time", "") if last_time: if latest_time is None or last_time > latest_time: latest_time = last_time # 只处理 last_time 等于最新时间的新闻 if latest_time: results_to_process = {} for source_id, source_titles in results.items(): if source_id in title_info: filtered_titles = {} for title, title_data in source_titles.items(): if title in title_info[source_id]: info = title_info[source_id][title] if info.get("last_time") == latest_time: filtered_titles[title] = title_data if filtered_titles: results_to_process[source_id] = filtered_titles print( f"当前榜单模式:最新时间 {latest_time},筛选出 {sum(len(titles) for titles in results_to_process.values())} 条当前榜单新闻" ) else: results_to_process = results else: results_to_process = results all_news_are_new = False else: # 当日汇总模式:处理所有新闻 results_to_process = results all_news_are_new = False total_input_news = sum(len(titles) for titles in results.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词过滤" ) print(f"当日汇总模式:处理 {total_input_news} 条新闻,模式:{filter_status}") word_stats = {} total_titles = 0 processed_titles = {} matched_new_count = 0 if title_info is None: title_info = {} if new_titles is None: new_titles = {} for group in word_groups: group_key = group["group_key"] word_stats[group_key] = {"count": 0, "titles": {}} for source_id, titles_data in results_to_process.items(): total_titles += len(titles_data) if source_id not in processed_titles: processed_titles[source_id] = {} for title, title_data in titles_data.items(): if title in processed_titles.get(source_id, {}): continue # 使用统一的匹配逻辑 matches_frequency_words = matches_word_groups( title, word_groups, filter_words ) if not matches_frequency_words: continue # 如果是增量模式或 current 模式第一次,统计匹配的新增新闻数量 if (mode == "incremental" and all_news_are_new) or ( mode == "current" and is_first_today ): matched_new_count += 1 source_ranks = title_data.get("ranks", []) source_url = title_data.get("url", "") source_mobile_url = title_data.get("mobileUrl", "") # 找到匹配的词组 title_lower = title.lower() for group in word_groups: required_words = group["required"] normal_words = group["normal"] # 如果是"全部新闻"模式,所有标题都匹配第一个(唯一的)词组 if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻": group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] else: # 原有的匹配逻辑 if required_words: all_required_present = all( req_word.lower() in title_lower for req_word in required_words ) if not all_required_present: continue if normal_words: any_normal_present = any( normal_word.lower() in title_lower for normal_word in normal_words ) if not any_normal_present: continue group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] first_time = "" last_time = "" count_info = 1 ranks = source_ranks if source_ranks else [] url = source_url mobile_url = source_mobile_url # 对于 current 模式,从历史统计信息中获取完整数据 if ( mode == "current" and title_info and source_id in title_info and title in title_info[source_id] ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") count_info = info.get("count", 1) if "ranks" in info and info["ranks"]: ranks = info["ranks"] url = info.get("url", source_url) mobile_url = info.get("mobileUrl", source_mobile_url) elif ( title_info and source_id in title_info and title in title_info[source_id] ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") count_info = info.get("count", 1) if "ranks" in info and info["ranks"]: ranks = info["ranks"] url = info.get("url", source_url) mobile_url = info.get("mobileUrl", source_mobile_url) if not ranks: ranks = [99] time_display = format_time_display(first_time, last_time) source_name = id_to_name.get(source_id, source_id) # 判断是否为新增 is_new = False if all_news_are_new: # 增量模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增 is_new = True elif new_titles and source_id in new_titles: # 检查是否在新增列表中 new_titles_for_source = new_titles[source_id] is_new = title in new_titles_for_source word_stats[group_key]["titles"][source_id].append( { "title": title, "source_name": source_name, "first_time": first_time, "last_time": last_time, "time_display": time_display, "count": count_info, "ranks": ranks, "rank_threshold": rank_threshold, "url": url, "mobileUrl": mobile_url, "is_new": is_new, } ) if source_id not in processed_titles: processed_titles[source_id] = {} processed_titles[source_id][title] = True break # 最后统一打印汇总信息 if mode == "incremental": if is_first_today: total_input_news = sum(len(titles) for titles in results.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) print( f"增量模式:当天第一次爬取,{total_input_news} 条新闻中有 {matched_new_count} 条{filter_status}" ) else: if new_titles: total_new_count = sum(len(titles) for titles in new_titles.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "匹配频率词" ) print( f"增量模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条{filter_status}" ) if matched_new_count == 0 and len(word_groups) > 1: print("增量模式:没有新增新闻匹配频率词,将不会发送通知") else: print("增量模式:未检测到新增新闻") elif mode == "current": total_input_news = sum(len(titles) for titles in results_to_process.values()) if is_first_today: filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) print( f"当前榜单模式:当天第一次爬取,{total_input_news} 条当前榜单新闻中有 {matched_new_count} 条{filter_status}" ) else: matched_count = sum(stat["count"] for stat in word_stats.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) print( f"当前榜单模式:{total_input_news} 条当前榜单新闻中有 {matched_count} 条{filter_status}" ) stats = [] for group_key, data in word_stats.items(): all_titles = [] for source_id, title_list in data["titles"].items(): all_titles.extend(title_list) # 按权重排序 sorted_titles = sorted( all_titles, key=lambda x: ( -calculate_news_weight(x, rank_threshold), min(x["ranks"]) if x["ranks"] else 999, -x["count"], ), ) stats.append( { "word": group_key, "count": data["count"], "titles": sorted_titles, "percentage": ( round(data["count"] / total_titles * 100, 2) if total_titles > 0 else 0 ), } ) stats.sort(key=lambda x: x["count"], reverse=True) return stats, total_titles # === 报告生成 === def prepare_report_data( stats: List[Dict], failed_ids: Optional[List] = None, new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, mode: str = "daily", ) -> Dict: """准备报告数据""" processed_new_titles = [] # 在增量模式下隐藏新增新闻区域 hide_new_section = mode == "incremental" # 只有在非隐藏模式下才处理新增新闻部分 if not hide_new_section: filtered_new_titles = {} if new_titles and id_to_name: word_groups, filter_words = load_frequency_words() for source_id, titles_data in new_titles.items(): filtered_titles = {} for title, title_data in titles_data.items(): if matches_word_groups(title, word_groups, filter_words): filtered_titles[title] = title_data if filtered_titles: filtered_new_titles[source_id] = filtered_titles if filtered_new_titles and id_to_name: for source_id, titles_data in filtered_new_titles.items(): source_name = id_to_name.get(source_id, source_id) source_titles = [] for title, title_data in titles_data.items(): url = title_data.get("url", "") mobile_url = title_data.get("mobileUrl", "") ranks = title_data.get("ranks", []) processed_title = { "title": title, "source_name": source_name, "time_display": "", "count": 1, "ranks": ranks, "rank_threshold": CONFIG["RANK_THRESHOLD"], "url": url, "mobile_url": mobile_url, "is_new": True, } source_titles.append(processed_title) if source_titles: processed_new_titles.append( { "source_id": source_id, "source_name": source_name, "titles": source_titles, } ) processed_stats = [] for stat in stats: if stat["count"] <= 0: continue processed_titles = [] for title_data in stat["titles"]: processed_title = { "title": title_data["title"], "source_name": title_data["source_name"], "time_display": title_data["time_display"], "count": title_data["count"], "ranks": title_data["ranks"], "rank_threshold": title_data["rank_threshold"], "url": title_data.get("url", ""), "mobile_url": title_data.get("mobileUrl", ""), "is_new": title_data.get("is_new", False), } processed_titles.append(processed_title) processed_stats.append( { "word": stat["word"], "count": stat["count"], "percentage": stat.get("percentage", 0), "titles": processed_titles, } ) return { "stats": processed_stats, "new_titles": processed_new_titles, "failed_ids": failed_ids or [], "total_new_count": sum( len(source["titles"]) for source in processed_new_titles ), } def format_title_for_platform( platform: str, title_data: Dict, show_source: bool = True ) -> str: """统一的标题格式化方法""" rank_display = format_rank_display( title_data["ranks"], title_data["rank_threshold"], platform ) link_url = title_data["mobile_url"] or title_data["url"] cleaned_title = clean_title(title_data["title"]) if platform == "feishu": if link_url: formatted_title = f"[{cleaned_title}]({link_url})" else: formatted_title = cleaned_title title_prefix = "🆕 " if title_data.get("is_new") else "" if show_source: result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}" else: result = f"{title_prefix}{formatted_title}" if rank_display: result += f" {rank_display}" if title_data["time_display"]: result += f" - {title_data['time_display']}" if title_data["count"] > 1: result += f" ({title_data['count']}次)" return result elif platform == "dingtalk": if link_url: formatted_title = f"[{cleaned_title}]({link_url})" else: formatted_title = cleaned_title title_prefix = "🆕 " if title_data.get("is_new") else "" if show_source: result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}" else: result = f"{title_prefix}{formatted_title}" if rank_display: result += f" {rank_display}" if title_data["time_display"]: result += f" - {title_data['time_display']}" if title_data["count"] > 1: result += f" ({title_data['count']}次)" return result elif platform == "wework": if link_url: formatted_title = f"[{cleaned_title}]({link_url})" else: formatted_title = cleaned_title title_prefix = "🆕 " if title_data.get("is_new") else "" if show_source: result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}" else: result = f"{title_prefix}{formatted_title}" if rank_display: result += f" {rank_display}" if title_data["time_display"]: result += f" - {title_data['time_display']}" if title_data["count"] > 1: result += f" ({title_data['count']}次)" return result elif platform == "telegram": if link_url: formatted_title = f'{html_escape(cleaned_title)}' else: formatted_title = cleaned_title title_prefix = "🆕 " if title_data.get("is_new") else "" if show_source: result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}" else: result = f"{title_prefix}{formatted_title}" if rank_display: result += f" {rank_display}" if title_data["time_display"]: result += f" - {title_data['time_display']}" if title_data["count"] > 1: result += f" ({title_data['count']}次)" return result elif platform == "ntfy": if link_url: formatted_title = f"[{cleaned_title}]({link_url})" else: formatted_title = cleaned_title title_prefix = "🆕 " if title_data.get("is_new") else "" if show_source: result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}" else: result = f"{title_prefix}{formatted_title}" if rank_display: result += f" {rank_display}" if title_data["time_display"]: result += f" `- {title_data['time_display']}`" if title_data["count"] > 1: result += f" `({title_data['count']}次)`" return result elif platform == "html": rank_display = format_rank_display( title_data["ranks"], title_data["rank_threshold"], "html" ) link_url = title_data["mobile_url"] or title_data["url"] escaped_title = html_escape(cleaned_title) escaped_source_name = html_escape(title_data["source_name"]) if link_url: escaped_url = html_escape(link_url) formatted_title = f'[{escaped_source_name}] {escaped_title}' else: formatted_title = ( f'[{escaped_source_name}] {escaped_title}' ) if rank_display: formatted_title += f" {rank_display}" if title_data["time_display"]: escaped_time = html_escape(title_data["time_display"]) formatted_title += f" - {escaped_time}" if title_data["count"] > 1: formatted_title += f" ({title_data['count']}次)" if title_data.get("is_new"): formatted_title = f"
🆕 {formatted_title}
" return formatted_title else: return cleaned_title def generate_html_report( stats: List[Dict], total_titles: int, failed_ids: Optional[List] = None, new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, mode: str = "daily", is_daily_summary: bool = False, update_info: Optional[Dict] = None, ) -> str: """生成HTML报告""" if is_daily_summary: if mode == "current": filename = "当前榜单汇总.html" elif mode == "incremental": filename = "当日增量.html" else: filename = "当日汇总.html" else: filename = f"{format_time_filename()}.html" file_path = get_output_path("html", filename) report_data = prepare_report_data(stats, failed_ids, new_titles, id_to_name, mode) html_content = render_html_content( report_data, total_titles, is_daily_summary, mode, update_info ) with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) if is_daily_summary: root_file_path = Path("index.html") with open(root_file_path, "w", encoding="utf-8") as f: f.write(html_content) return file_path def render_html_content( report_data: Dict, total_titles: int, is_daily_summary: bool = False, mode: str = "daily", update_info: Optional[Dict] = None, ) -> str: """渲染HTML内容""" html = """ 热点新闻分析
热点新闻分析
报告类型 """ # 处理报告类型显示 if is_daily_summary: if mode == "current": html += "当前榜单" elif mode == "incremental": html += "增量模式" else: html += "当日汇总" else: html += "实时分析" html += """
新闻总数 """ html += f"{total_titles} 条" # 计算筛选后的热点新闻数量 hot_news_count = sum(len(stat["titles"]) for stat in report_data["stats"]) html += """
热点新闻 """ html += f"{hot_news_count} 条" html += """
生成时间 """ now = get_beijing_time() html += now.strftime("%m-%d %H:%M") html += """
""" # 处理失败ID错误信息 if report_data["failed_ids"]: html += """
⚠️ 请求失败的平台
    """ for id_value in report_data["failed_ids"]: html += f'
  • {html_escape(id_value)}
  • ' html += """
""" # 处理主要统计数据 if report_data["stats"]: total_count = len(report_data["stats"]) for i, stat in enumerate(report_data["stats"], 1): count = stat["count"] # 确定热度等级 if count >= 10: count_class = "hot" elif count >= 5: count_class = "warm" else: count_class = "" escaped_word = html_escape(stat["word"]) html += f"""
{escaped_word}
{count} 条
{i}/{total_count}
""" # 处理每个词组下的新闻标题,给每条新闻标上序号 for j, title_data in enumerate(stat["titles"], 1): is_new = title_data.get("is_new", False) new_class = "new" if is_new else "" html += f"""
{j}
{html_escape(title_data["source_name"])}""" # 处理排名显示 ranks = title_data.get("ranks", []) if ranks: min_rank = min(ranks) max_rank = max(ranks) rank_threshold = title_data.get("rank_threshold", 10) # 确定排名等级 if min_rank <= 3: rank_class = "top" elif min_rank <= rank_threshold: rank_class = "high" else: rank_class = "" if min_rank == max_rank: rank_text = str(min_rank) else: rank_text = f"{min_rank}-{max_rank}" html += f'{rank_text}' # 处理时间显示 time_display = title_data.get("time_display", "") if time_display: # 简化时间显示格式,将波浪线替换为~ simplified_time = ( time_display.replace(" ~ ", "~") .replace("[", "") .replace("]", "") ) html += ( f'{html_escape(simplified_time)}' ) # 处理出现次数 count_info = title_data.get("count", 1) if count_info > 1: html += f'{count_info}次' html += """
""" # 处理标题和链接 escaped_title = html_escape(title_data["title"]) link_url = title_data.get("mobile_url") or title_data.get("url", "") if link_url: escaped_url = html_escape(link_url) html += f'{escaped_title}' else: html += escaped_title html += """
""" html += """
""" # 处理新增新闻区域 if report_data["new_titles"]: html += f"""
本次新增热点 (共 {report_data['total_new_count']} 条)
""" for source_data in report_data["new_titles"]: escaped_source = html_escape(source_data["source_name"]) titles_count = len(source_data["titles"]) html += f"""
{escaped_source} · {titles_count}条
""" # 为新增新闻也添加序号 for idx, title_data in enumerate(source_data["titles"], 1): ranks = title_data.get("ranks", []) # 处理新增新闻的排名显示 rank_class = "" if ranks: min_rank = min(ranks) if min_rank <= 3: rank_class = "top" elif min_rank <= title_data.get("rank_threshold", 10): rank_class = "high" if len(ranks) == 1: rank_text = str(ranks[0]) else: rank_text = f"{min(ranks)}-{max(ranks)}" else: rank_text = "?" html += f"""
{idx}
{rank_text}
""" # 处理新增新闻的链接 escaped_title = html_escape(title_data["title"]) link_url = title_data.get("mobile_url") or title_data.get("url", "") if link_url: escaped_url = html_escape(link_url) html += f'{escaped_title}' else: html += escaped_title html += """
""" html += """
""" html += """
""" html += """
""" return html def render_feishu_content( report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily" ) -> str: """渲染飞书内容""" text_content = "" if report_data["stats"]: text_content += f"📊 **热点词汇统计**\n\n" total_count = len(report_data["stats"]) for i, stat in enumerate(report_data["stats"]): word = stat["word"] count = stat["count"] sequence_display = f"[{i + 1}/{total_count}]" if count >= 10: text_content += f"🔥 {sequence_display} **{word}** : {count} 条\n\n" elif count >= 5: text_content += f"📈 {sequence_display} **{word}** : {count} 条\n\n" else: text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n" for j, title_data in enumerate(stat["titles"], 1): formatted_title = format_title_for_platform( "feishu", title_data, show_source=True ) text_content += f" {j}. {formatted_title}\n" if j < len(stat["titles"]): text_content += "\n" if i < len(report_data["stats"]) - 1: text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n" if not text_content: if mode == "incremental": mode_text = "增量模式下暂无新增匹配的热点词汇" elif mode == "current": mode_text = "当前榜单模式下暂无匹配的热点词汇" else: mode_text = "暂无匹配的热点词汇" text_content = f"📭 {mode_text}\n\n" if report_data["new_titles"]: if text_content and "暂无匹配" not in text_content: text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n" text_content += ( f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" ) for source_data in report_data["new_titles"]: text_content += ( f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n" ) for j, title_data in enumerate(source_data["titles"], 1): title_data_copy = title_data.copy() title_data_copy["is_new"] = False formatted_title = format_title_for_platform( "feishu", title_data_copy, show_source=False ) text_content += f" {j}. {formatted_title}\n" text_content += "\n" if report_data["failed_ids"]: if text_content and "暂无匹配" not in text_content: text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n" text_content += "⚠️ **数据获取失败的平台:**\n\n" for i, id_value in enumerate(report_data["failed_ids"], 1): text_content += f" • {id_value}\n" now = get_beijing_time() text_content += ( f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" ) if update_info: text_content += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}" return text_content def render_dingtalk_content( report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily" ) -> str: """渲染钉钉内容""" text_content = "" total_titles = sum( len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0 ) now = get_beijing_time() text_content += f"**总新闻数:** {total_titles}\n\n" text_content += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n" text_content += f"**类型:** 热点分析报告\n\n" text_content += "---\n\n" if report_data["stats"]: text_content += f"📊 **热点词汇统计**\n\n" total_count = len(report_data["stats"]) for i, stat in enumerate(report_data["stats"]): word = stat["word"] count = stat["count"] sequence_display = f"[{i + 1}/{total_count}]" if count >= 10: text_content += f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n" elif count >= 5: text_content += f"📈 {sequence_display} **{word}** : **{count}** 条\n\n" else: text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n" for j, title_data in enumerate(stat["titles"], 1): formatted_title = format_title_for_platform( "dingtalk", title_data, show_source=True ) text_content += f" {j}. {formatted_title}\n" if j < len(stat["titles"]): text_content += "\n" if i < len(report_data["stats"]) - 1: text_content += f"\n---\n\n" if not report_data["stats"]: if mode == "incremental": mode_text = "增量模式下暂无新增匹配的热点词汇" elif mode == "current": mode_text = "当前榜单模式下暂无匹配的热点词汇" else: mode_text = "暂无匹配的热点词汇" text_content += f"📭 {mode_text}\n\n" if report_data["new_titles"]: if text_content and "暂无匹配" not in text_content: text_content += f"\n---\n\n" text_content += ( f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" ) for source_data in report_data["new_titles"]: text_content += f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n" for j, title_data in enumerate(source_data["titles"], 1): title_data_copy = title_data.copy() title_data_copy["is_new"] = False formatted_title = format_title_for_platform( "dingtalk", title_data_copy, show_source=False ) text_content += f" {j}. {formatted_title}\n" text_content += "\n" if report_data["failed_ids"]: if text_content and "暂无匹配" not in text_content: text_content += f"\n---\n\n" text_content += "⚠️ **数据获取失败的平台:**\n\n" for i, id_value in enumerate(report_data["failed_ids"], 1): text_content += f" • **{id_value}**\n" text_content += f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" if update_info: text_content += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**" return text_content def split_content_into_batches( report_data: Dict, format_type: str, update_info: Optional[Dict] = None, max_bytes: int = None, mode: str = "daily", ) -> List[str]: """分批处理消息内容,确保词组标题+至少第一条新闻的完整性""" if max_bytes is None: if format_type == "dingtalk": max_bytes = CONFIG.get("DINGTALK_BATCH_SIZE", 20000) elif format_type == "ntfy": max_bytes = 3800 else: max_bytes = CONFIG.get("MESSAGE_BATCH_SIZE", 4000) batches = [] total_titles = sum( len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0 ) now = get_beijing_time() base_header = "" if format_type == "wework": base_header = f"**总新闻数:** {total_titles}\n\n\n\n" elif format_type == "telegram": base_header = f"总新闻数: {total_titles}\n\n" elif format_type == "ntfy": base_header = f"**总新闻数:** {total_titles}\n\n" elif format_type == "dingtalk": base_header = f"**总新闻数:** {total_titles}\n\n" base_header += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n" base_header += f"**类型:** 热点分析报告\n\n" base_header += "---\n\n" base_footer = "" if format_type == "wework": base_footer = f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" if update_info: base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**" elif format_type == "telegram": base_footer = f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" if update_info: base_footer += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}" elif format_type == "ntfy": base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" if update_info: base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**" elif format_type == "dingtalk": base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}" if update_info: base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**" stats_header = "" if report_data["stats"]: if format_type == "wework": stats_header = f"📊 **热点词汇统计**\n\n" elif format_type == "telegram": stats_header = f"📊 热点词汇统计\n\n" elif format_type == "ntfy": stats_header = f"📊 **热点词汇统计**\n\n" elif format_type == "dingtalk": stats_header = f"📊 **热点词汇统计**\n\n" current_batch = base_header current_batch_has_content = False if ( not report_data["stats"] and not report_data["new_titles"] and not report_data["failed_ids"] ): if mode == "incremental": mode_text = "增量模式下暂无新增匹配的热点词汇" elif mode == "current": mode_text = "当前榜单模式下暂无匹配的热点词汇" else: mode_text = "暂无匹配的热点词汇" simple_content = f"📭 {mode_text}\n\n" final_content = base_header + simple_content + base_footer batches.append(final_content) return batches # 处理热点词汇统计 if report_data["stats"]: total_count = len(report_data["stats"]) # 添加统计标题 test_content = current_batch + stats_header if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) < max_bytes ): current_batch = test_content current_batch_has_content = True else: if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + stats_header current_batch_has_content = True # 逐个处理词组(确保词组标题+第一条新闻的原子性) for i, stat in enumerate(report_data["stats"]): word = stat["word"] count = stat["count"] sequence_display = f"[{i + 1}/{total_count}]" # 构建词组标题 word_header = "" if format_type == "wework": if count >= 10: word_header = ( f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n" ) elif count >= 5: word_header = ( f"📈 {sequence_display} **{word}** : **{count}** 条\n\n" ) else: word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n" elif format_type == "telegram": if count >= 10: word_header = f"🔥 {sequence_display} {word} : {count} 条\n\n" elif count >= 5: word_header = f"📈 {sequence_display} {word} : {count} 条\n\n" else: word_header = f"📌 {sequence_display} {word} : {count} 条\n\n" elif format_type == "ntfy": if count >= 10: word_header = ( f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n" ) elif count >= 5: word_header = ( f"📈 {sequence_display} **{word}** : **{count}** 条\n\n" ) else: word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n" elif format_type == "dingtalk": if count >= 10: word_header = ( f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n" ) elif count >= 5: word_header = ( f"📈 {sequence_display} **{word}** : **{count}** 条\n\n" ) else: word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n" # 构建第一条新闻 first_news_line = "" if stat["titles"]: first_title_data = stat["titles"][0] if format_type == "wework": formatted_title = format_title_for_platform( "wework", first_title_data, show_source=True ) elif format_type == "telegram": formatted_title = format_title_for_platform( "telegram", first_title_data, show_source=True ) elif format_type == "ntfy": formatted_title = format_title_for_platform( "ntfy", first_title_data, show_source=True ) elif format_type == "dingtalk": formatted_title = format_title_for_platform( "dingtalk", first_title_data, show_source=True ) else: formatted_title = f"{first_title_data['title']}" first_news_line = f" 1. {formatted_title}\n" if len(stat["titles"]) > 1: first_news_line += "\n" # 原子性检查:词组标题+第一条新闻必须一起处理 word_with_first_news = word_header + first_news_line test_content = current_batch + word_with_first_news if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): # 当前批次容纳不下,开启新批次 if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + stats_header + word_with_first_news current_batch_has_content = True start_index = 1 else: current_batch = test_content current_batch_has_content = True start_index = 1 # 处理剩余新闻条目 for j in range(start_index, len(stat["titles"])): title_data = stat["titles"][j] if format_type == "wework": formatted_title = format_title_for_platform( "wework", title_data, show_source=True ) elif format_type == "telegram": formatted_title = format_title_for_platform( "telegram", title_data, show_source=True ) elif format_type == "ntfy": formatted_title = format_title_for_platform( "ntfy", title_data, show_source=True ) elif format_type == "dingtalk": formatted_title = format_title_for_platform( "dingtalk", title_data, show_source=True ) else: formatted_title = f"{title_data['title']}" news_line = f" {j + 1}. {formatted_title}\n" if j < len(stat["titles"]) - 1: news_line += "\n" test_content = current_batch + news_line if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + stats_header + word_header + news_line current_batch_has_content = True else: current_batch = test_content current_batch_has_content = True # 词组间分隔符 if i < len(report_data["stats"]) - 1: separator = "" if format_type == "wework": separator = f"\n\n\n\n" elif format_type == "telegram": separator = f"\n\n" elif format_type == "ntfy": separator = f"\n\n" elif format_type == "dingtalk": separator = f"\n---\n\n" test_content = current_batch + separator if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) < max_bytes ): current_batch = test_content # 处理新增新闻(同样确保来源标题+第一条新闻的原子性) if report_data["new_titles"]: new_header = "" if format_type == "wework": new_header = f"\n\n\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" elif format_type == "telegram": new_header = ( f"\n\n🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n" ) elif format_type == "ntfy": new_header = f"\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" elif format_type == "dingtalk": new_header = f"\n---\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n" test_content = current_batch + new_header if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + new_header current_batch_has_content = True else: current_batch = test_content current_batch_has_content = True # 逐个处理新增新闻来源 for source_data in report_data["new_titles"]: source_header = "" if format_type == "wework": source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n" elif format_type == "telegram": source_header = f"{source_data['source_name']} ({len(source_data['titles'])} 条):\n\n" elif format_type == "ntfy": source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n" elif format_type == "dingtalk": source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n" # 构建第一条新增新闻 first_news_line = "" if source_data["titles"]: first_title_data = source_data["titles"][0] title_data_copy = first_title_data.copy() title_data_copy["is_new"] = False if format_type == "wework": formatted_title = format_title_for_platform( "wework", title_data_copy, show_source=False ) elif format_type == "telegram": formatted_title = format_title_for_platform( "telegram", title_data_copy, show_source=False ) elif format_type == "dingtalk": formatted_title = format_title_for_platform( "dingtalk", title_data_copy, show_source=False ) else: formatted_title = f"{title_data_copy['title']}" first_news_line = f" 1. {formatted_title}\n" # 原子性检查:来源标题+第一条新闻 source_with_first_news = source_header + first_news_line test_content = current_batch + source_with_first_news if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + new_header + source_with_first_news current_batch_has_content = True start_index = 1 else: current_batch = test_content current_batch_has_content = True start_index = 1 # 处理剩余新增新闻 for j in range(start_index, len(source_data["titles"])): title_data = source_data["titles"][j] title_data_copy = title_data.copy() title_data_copy["is_new"] = False if format_type == "wework": formatted_title = format_title_for_platform( "wework", title_data_copy, show_source=False ) elif format_type == "telegram": formatted_title = format_title_for_platform( "telegram", title_data_copy, show_source=False ) elif format_type == "dingtalk": formatted_title = format_title_for_platform( "dingtalk", title_data_copy, show_source=False ) else: formatted_title = f"{title_data_copy['title']}" news_line = f" {j + 1}. {formatted_title}\n" test_content = current_batch + news_line if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + new_header + source_header + news_line current_batch_has_content = True else: current_batch = test_content current_batch_has_content = True current_batch += "\n" if report_data["failed_ids"]: failed_header = "" if format_type == "wework": failed_header = f"\n\n\n\n⚠️ **数据获取失败的平台:**\n\n" elif format_type == "telegram": failed_header = f"\n\n⚠️ 数据获取失败的平台:\n\n" elif format_type == "ntfy": failed_header = f"\n\n⚠️ **数据获取失败的平台:**\n\n" elif format_type == "dingtalk": failed_header = f"\n---\n\n⚠️ **数据获取失败的平台:**\n\n" test_content = current_batch + failed_header if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + failed_header current_batch_has_content = True else: current_batch = test_content current_batch_has_content = True for i, id_value in enumerate(report_data["failed_ids"], 1): if format_type == "dingtalk": failed_line = f" • **{id_value}**\n" else: failed_line = f" • {id_value}\n" test_content = current_batch + failed_line if ( len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes ): if current_batch_has_content: batches.append(current_batch + base_footer) current_batch = base_header + failed_header + failed_line current_batch_has_content = True else: current_batch = test_content current_batch_has_content = True # 完成最后批次 if current_batch_has_content: batches.append(current_batch + base_footer) return batches def send_to_notifications( stats: List[Dict], failed_ids: Optional[List] = None, report_type: str = "当日汇总", new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", html_file_path: Optional[str] = None, ) -> Dict[str, bool]: """发送数据到多个通知平台""" results = {} if CONFIG["PUSH_WINDOW"]["ENABLED"]: push_manager = PushRecordManager() time_range_start = CONFIG["PUSH_WINDOW"]["TIME_RANGE"]["START"] time_range_end = CONFIG["PUSH_WINDOW"]["TIME_RANGE"]["END"] if not push_manager.is_in_time_range(time_range_start, time_range_end): now = get_beijing_time() print( f"推送窗口控制:当前时间 {now.strftime('%H:%M')} 不在推送时间窗口 {time_range_start}-{time_range_end} 内,跳过推送" ) return results if CONFIG["PUSH_WINDOW"]["ONCE_PER_DAY"]: if push_manager.has_pushed_today(): print(f"推送窗口控制:今天已推送过,跳过本次推送") return results else: print(f"推送窗口控制:今天首次推送") report_data = prepare_report_data(stats, failed_ids, new_titles, id_to_name, mode) feishu_url = CONFIG["FEISHU_WEBHOOK_URL"] dingtalk_url = CONFIG["DINGTALK_WEBHOOK_URL"] wework_url = CONFIG["WEWORK_WEBHOOK_URL"] telegram_token = CONFIG["TELEGRAM_BOT_TOKEN"] telegram_chat_id = CONFIG["TELEGRAM_CHAT_ID"] email_from = CONFIG["EMAIL_FROM"] email_password = CONFIG["EMAIL_PASSWORD"] email_to = CONFIG["EMAIL_TO"] email_smtp_server = CONFIG.get("EMAIL_SMTP_SERVER", "") email_smtp_port = CONFIG.get("EMAIL_SMTP_PORT", "") ntfy_server_url = CONFIG["NTFY_SERVER_URL"] ntfy_topic = CONFIG["NTFY_TOPIC"] ntfy_token = CONFIG.get("NTFY_TOKEN", "") update_info_to_send = update_info if CONFIG["SHOW_VERSION_UPDATE"] else None # 发送到飞书 if feishu_url: results["feishu"] = send_to_feishu( feishu_url, report_data, report_type, update_info_to_send, proxy_url, mode ) # 发送到钉钉 if dingtalk_url: results["dingtalk"] = send_to_dingtalk( dingtalk_url, report_data, report_type, update_info_to_send, proxy_url, mode ) # 发送到企业微信 if wework_url: results["wework"] = send_to_wework( wework_url, report_data, report_type, update_info_to_send, proxy_url, mode ) # 发送到 Telegram if telegram_token and telegram_chat_id: results["telegram"] = send_to_telegram( telegram_token, telegram_chat_id, report_data, report_type, update_info_to_send, proxy_url, mode, ) # 发送到 ntfy if ntfy_server_url and ntfy_topic: results["ntfy"] = send_to_ntfy( ntfy_server_url, ntfy_topic, ntfy_token, report_data, report_type, update_info_to_send, proxy_url, mode, ) # 发送邮件 if email_from and email_password and email_to: results["email"] = send_to_email( email_from, email_password, email_to, report_type, html_file_path, email_smtp_server, email_smtp_port, ) if not results: print("未配置任何通知渠道,跳过通知发送") # 如果成功发送了任何通知,且启用了每天只推一次,则记录推送 if ( CONFIG["PUSH_WINDOW"]["ENABLED"] and CONFIG["PUSH_WINDOW"]["ONCE_PER_DAY"] and any(results.values()) ): push_manager = PushRecordManager() push_manager.record_push(report_type) return results def send_to_feishu( webhook_url: str, report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", ) -> bool: """发送到飞书""" headers = {"Content-Type": "application/json"} text_content = render_feishu_content(report_data, update_info, mode) total_titles = sum( len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0 ) now = get_beijing_time() payload = { "msg_type": "text", "content": { "total_titles": total_titles, "timestamp": now.strftime("%Y-%m-%d %H:%M:%S"), "report_type": report_type, "text": text_content, }, } proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} try: response = requests.post( webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30 ) if response.status_code == 200: print(f"飞书通知发送成功 [{report_type}]") return True else: print(f"飞书通知发送失败 [{report_type}],状态码:{response.status_code}") return False except Exception as e: print(f"飞书通知发送出错 [{report_type}]:{e}") return False def send_to_dingtalk( webhook_url: str, report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", ) -> bool: """发送到钉钉(支持分批发送)""" headers = {"Content-Type": "application/json"} proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 获取分批内容,使用钉钉专用的批次大小 batches = split_content_into_batches( report_data, "dingtalk", update_info, max_bytes=CONFIG.get("DINGTALK_BATCH_SIZE", 20000), mode=mode, ) print(f"钉钉消息分为 {len(batches)} 批次发送 [{report_type}]") # 逐批发送 for i, batch_content in enumerate(batches, 1): batch_size = len(batch_content.encode("utf-8")) print( f"发送钉钉第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]" ) # 添加批次标识 if len(batches) > 1: batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n" # 将批次标识插入到适当位置(在标题之后) if "📊 **热点词汇统计**" in batch_content: batch_content = batch_content.replace( "📊 **热点词汇统计**\n\n", f"📊 **热点词汇统计** {batch_header}\n\n" ) else: # 如果没有统计标题,直接在开头添加 batch_content = batch_header + batch_content payload = { "msgtype": "markdown", "markdown": { "title": f"TrendRadar 热点分析报告 - {report_type}", "text": batch_content, }, } try: response = requests.post( webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30 ) if response.status_code == 200: result = response.json() if result.get("errcode") == 0: print(f"钉钉第 {i}/{len(batches)} 批次发送成功 [{report_type}]") # 批次间间隔 if i < len(batches): time.sleep(CONFIG["BATCH_SEND_INTERVAL"]) else: print( f"钉钉第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('errmsg')}" ) return False else: print( f"钉钉第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}" ) return False except Exception as e: print(f"钉钉第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}") return False print(f"钉钉所有 {len(batches)} 批次发送完成 [{report_type}]") return True def send_to_wework( webhook_url: str, report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", ) -> bool: """发送到企业微信(支持分批发送)""" headers = {"Content-Type": "application/json"} proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 获取分批内容 batches = split_content_into_batches(report_data, "wework", update_info, mode=mode) print(f"企业微信消息分为 {len(batches)} 批次发送 [{report_type}]") # 逐批发送 for i, batch_content in enumerate(batches, 1): batch_size = len(batch_content.encode("utf-8")) print( f"发送企业微信第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]" ) # 添加批次标识 if len(batches) > 1: batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n" batch_content = batch_header + batch_content payload = {"msgtype": "markdown", "markdown": {"content": batch_content}} try: response = requests.post( webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30 ) if response.status_code == 200: result = response.json() if result.get("errcode") == 0: print(f"企业微信第 {i}/{len(batches)} 批次发送成功 [{report_type}]") # 批次间间隔 if i < len(batches): time.sleep(CONFIG["BATCH_SEND_INTERVAL"]) else: print( f"企业微信第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('errmsg')}" ) return False else: print( f"企业微信第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}" ) return False except Exception as e: print(f"企业微信第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}") return False print(f"企业微信所有 {len(batches)} 批次发送完成 [{report_type}]") return True def send_to_telegram( bot_token: str, chat_id: str, report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", ) -> bool: """发送到Telegram(支持分批发送)""" headers = {"Content-Type": "application/json"} url = f"https://api.telegram.org/bot{bot_token}/sendMessage" proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 获取分批内容 batches = split_content_into_batches( report_data, "telegram", update_info, mode=mode ) print(f"Telegram消息分为 {len(batches)} 批次发送 [{report_type}]") # 逐批发送 for i, batch_content in enumerate(batches, 1): batch_size = len(batch_content.encode("utf-8")) print( f"发送Telegram第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]" ) # 添加批次标识 if len(batches) > 1: batch_header = f"[第 {i}/{len(batches)} 批次]\n\n" batch_content = batch_header + batch_content payload = { "chat_id": chat_id, "text": batch_content, "parse_mode": "HTML", "disable_web_page_preview": True, } try: response = requests.post( url, headers=headers, json=payload, proxies=proxies, timeout=30 ) if response.status_code == 200: result = response.json() if result.get("ok"): print(f"Telegram第 {i}/{len(batches)} 批次发送成功 [{report_type}]") # 批次间间隔 if i < len(batches): time.sleep(CONFIG["BATCH_SEND_INTERVAL"]) else: print( f"Telegram第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('description')}" ) return False else: print( f"Telegram第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}" ) return False except Exception as e: print(f"Telegram第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}") return False print(f"Telegram所有 {len(batches)} 批次发送完成 [{report_type}]") return True def send_to_email( from_email: str, password: str, to_email: str, report_type: str, html_file_path: str, custom_smtp_server: Optional[str] = None, custom_smtp_port: Optional[int] = None, ) -> bool: """发送邮件通知""" try: if not html_file_path or not Path(html_file_path).exists(): print(f"错误:HTML文件不存在或未提供: {html_file_path}") return False print(f"使用HTML文件: {html_file_path}") with open(html_file_path, "r", encoding="utf-8") as f: html_content = f.read() domain = from_email.split("@")[-1].lower() if custom_smtp_server and custom_smtp_port: # 使用自定义 SMTP 配置 smtp_server = custom_smtp_server smtp_port = int(custom_smtp_port) use_tls = smtp_port == 587 elif domain in SMTP_CONFIGS: # 使用预设配置 config = SMTP_CONFIGS[domain] smtp_server = config["server"] smtp_port = config["port"] use_tls = config["encryption"] == "TLS" else: print(f"未识别的邮箱服务商: {domain},使用通用 SMTP 配置") smtp_server = f"smtp.{domain}" smtp_port = 587 use_tls = True msg = MIMEMultipart("alternative") # 严格按照 RFC 标准设置 From header sender_name = "TrendRadar" msg["From"] = formataddr((sender_name, from_email)) # 设置收件人 recipients = [addr.strip() for addr in to_email.split(",")] if len(recipients) == 1: msg["To"] = recipients[0] else: msg["To"] = ", ".join(recipients) # 设置邮件主题 now = get_beijing_time() subject = f"TrendRadar 热点分析报告 - {report_type} - {now.strftime('%m月%d日 %H:%M')}" msg["Subject"] = Header(subject, "utf-8") # 设置其他标准 header msg["MIME-Version"] = "1.0" msg["Date"] = formatdate(localtime=True) msg["Message-ID"] = make_msgid() # 添加纯文本部分(作为备选) text_content = f""" TrendRadar 热点分析报告 ======================== 报告类型:{report_type} 生成时间:{now.strftime('%Y-%m-%d %H:%M:%S')} 请使用支持HTML的邮件客户端查看完整报告内容。 """ text_part = MIMEText(text_content, "plain", "utf-8") msg.attach(text_part) html_part = MIMEText(html_content, "html", "utf-8") msg.attach(html_part) print(f"正在发送邮件到 {to_email}...") print(f"SMTP 服务器: {smtp_server}:{smtp_port}") print(f"发件人: {from_email}") try: if use_tls: # TLS 模式 server = smtplib.SMTP(smtp_server, smtp_port, timeout=30) server.set_debuglevel(0) # 设为1可以查看详细调试信息 server.ehlo() server.starttls() server.ehlo() else: # SSL 模式 server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30) server.set_debuglevel(0) server.ehlo() # 登录 server.login(from_email, password) # 发送邮件 server.send_message(msg) server.quit() print(f"邮件发送成功 [{report_type}] -> {to_email}") return True except smtplib.SMTPServerDisconnected: print(f"邮件发送失败:服务器意外断开连接,请检查网络或稍后重试") return False except smtplib.SMTPAuthenticationError as e: print(f"邮件发送失败:认证错误,请检查邮箱和密码/授权码") print(f"详细错误: {str(e)}") return False except smtplib.SMTPRecipientsRefused as e: print(f"邮件发送失败:收件人地址被拒绝 {e}") return False except smtplib.SMTPSenderRefused as e: print(f"邮件发送失败:发件人地址被拒绝 {e}") return False except smtplib.SMTPDataError as e: print(f"邮件发送失败:邮件数据错误 {e}") return False except smtplib.SMTPConnectError as e: print(f"邮件发送失败:无法连接到 SMTP 服务器 {smtp_server}:{smtp_port}") print(f"详细错误: {str(e)}") return False except Exception as e: print(f"邮件发送失败 [{report_type}]:{e}") import traceback traceback.print_exc() return False def send_to_ntfy( server_url: str, topic: str, token: Optional[str], report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", ) -> bool: """发送到ntfy(支持分批发送,严格遵守4KB限制)""" # 避免 HTTP header 编码问题 report_type_en_map = { "当日汇总": "Daily Summary", "当前榜单汇总": "Current Ranking", "增量更新": "Incremental Update", } report_type_en = report_type_en_map.get(report_type, report_type) headers = { "Content-Type": "text/plain; charset=utf-8", "Markdown": "yes", "Title": report_type_en, "Priority": "default", "Tags": "news", } if token: headers["Authorization"] = f"Bearer {token}" # 构建完整URL,确保格式正确 base_url = server_url.rstrip("/") if not base_url.startswith(("http://", "https://")): base_url = f"https://{base_url}" url = f"{base_url}/{topic}" proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 获取分批内容,使用ntfy专用的4KB限制 batches = split_content_into_batches( report_data, "ntfy", update_info, max_bytes=3800, mode=mode ) total_batches = len(batches) print(f"ntfy消息分为 {total_batches} 批次发送 [{report_type}]") # 反转批次顺序,使得在ntfy客户端显示时顺序正确 # ntfy显示最新消息在上面,所以我们从最后一批开始推送 reversed_batches = list(reversed(batches)) print(f"ntfy将按反向顺序推送(最后批次先推送),确保客户端显示顺序正确") # 逐批发送(反向顺序) success_count = 0 for idx, batch_content in enumerate(reversed_batches, 1): # 计算正确的批次编号(用户视角的编号) actual_batch_num = total_batches - idx + 1 batch_size = len(batch_content.encode("utf-8")) print( f"发送ntfy第 {actual_batch_num}/{total_batches} 批次(推送顺序: {idx}/{total_batches}),大小:{batch_size} 字节 [{report_type}]" ) # 检查消息大小,确保不超过4KB if batch_size > 4096: print(f"警告:ntfy第 {actual_batch_num} 批次消息过大({batch_size} 字节),可能被拒绝") # 添加批次标识(使用正确的批次编号) current_headers = headers.copy() if total_batches > 1: batch_header = f"**[第 {actual_batch_num}/{total_batches} 批次]**\n\n" batch_content = batch_header + batch_content current_headers["Title"] = ( f"{report_type_en} ({actual_batch_num}/{total_batches})" ) try: response = requests.post( url, headers=current_headers, data=batch_content.encode("utf-8"), proxies=proxies, timeout=30, ) if response.status_code == 200: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次发送成功 [{report_type}]") success_count += 1 if idx < total_batches: # 公共服务器建议 2-3 秒,自托管可以更短 interval = 2 if "ntfy.sh" in server_url else 1 time.sleep(interval) elif response.status_code == 429: print( f"ntfy第 {actual_batch_num}/{total_batches} 批次速率限制 [{report_type}],等待后重试" ) time.sleep(10) # 等待10秒后重试 # 重试一次 retry_response = requests.post( url, headers=current_headers, data=batch_content.encode("utf-8"), proxies=proxies, timeout=30, ) if retry_response.status_code == 200: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次重试成功 [{report_type}]") success_count += 1 else: print( f"ntfy第 {actual_batch_num}/{total_batches} 批次重试失败,状态码:{retry_response.status_code}" ) elif response.status_code == 413: print( f"ntfy第 {actual_batch_num}/{total_batches} 批次消息过大被拒绝 [{report_type}],消息大小:{batch_size} 字节" ) else: print( f"ntfy第 {actual_batch_num}/{total_batches} 批次发送失败 [{report_type}],状态码:{response.status_code}" ) try: error_detail = response.text[:200] # 只显示前200字符的错误信息 print(f"错误详情:{error_detail}") except: pass except requests.exceptions.ConnectTimeout: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次连接超时 [{report_type}]") except requests.exceptions.ReadTimeout: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次读取超时 [{report_type}]") except requests.exceptions.ConnectionError as e: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次连接错误 [{report_type}]:{e}") except Exception as e: print(f"ntfy第 {actual_batch_num}/{total_batches} 批次发送异常 [{report_type}]:{e}") # 判断整体发送是否成功 if success_count == total_batches: print(f"ntfy所有 {total_batches} 批次发送完成 [{report_type}]") return True elif success_count > 0: print(f"ntfy部分发送成功:{success_count}/{total_batches} 批次 [{report_type}]") return True # 部分成功也视为成功 else: print(f"ntfy发送完全失败 [{report_type}]") return False # === 主分析器 === class NewsAnalyzer: """新闻分析器""" # 模式策略定义 MODE_STRATEGIES = { "incremental": { "mode_name": "增量模式", "description": "增量模式(只关注新增新闻,无新增时不推送)", "realtime_report_type": "实时增量", "summary_report_type": "当日汇总", "should_send_realtime": True, "should_generate_summary": True, "summary_mode": "daily", }, "current": { "mode_name": "当前榜单模式", "description": "当前榜单模式(当前榜单匹配新闻 + 新增新闻区域 + 按时推送)", "realtime_report_type": "实时当前榜单", "summary_report_type": "当前榜单汇总", "should_send_realtime": True, "should_generate_summary": True, "summary_mode": "current", }, "daily": { "mode_name": "当日汇总模式", "description": "当日汇总模式(所有匹配新闻 + 新增新闻区域 + 按时推送)", "realtime_report_type": "", "summary_report_type": "当日汇总", "should_send_realtime": False, "should_generate_summary": True, "summary_mode": "daily", }, } def __init__(self): self.request_interval = CONFIG["REQUEST_INTERVAL"] self.report_mode = CONFIG["REPORT_MODE"] self.rank_threshold = CONFIG["RANK_THRESHOLD"] self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true" self.is_docker_container = self._detect_docker_environment() self.update_info = None self.proxy_url = None self._setup_proxy() self.data_fetcher = DataFetcher(self.proxy_url) if self.is_github_actions: self._check_version_update() def _detect_docker_environment(self) -> bool: """检测是否运行在 Docker 容器中""" try: if os.environ.get("DOCKER_CONTAINER") == "true": return True if os.path.exists("/.dockerenv"): return True return False except Exception: return False def _should_open_browser(self) -> bool: """判断是否应该打开浏览器""" return not self.is_github_actions and not self.is_docker_container def _setup_proxy(self) -> None: """设置代理配置""" if not self.is_github_actions and CONFIG["USE_PROXY"]: self.proxy_url = CONFIG["DEFAULT_PROXY"] print("本地环境,使用代理") elif not self.is_github_actions and not CONFIG["USE_PROXY"]: print("本地环境,未启用代理") else: print("GitHub Actions环境,不使用代理") def _check_version_update(self) -> None: """检查版本更新""" try: need_update, remote_version = check_version_update( VERSION, CONFIG["VERSION_CHECK_URL"], self.proxy_url ) if need_update and remote_version: self.update_info = { "current_version": VERSION, "remote_version": remote_version, } print(f"发现新版本: {remote_version} (当前: {VERSION})") else: print("版本检查完成,当前为最新版本") except Exception as e: print(f"版本检查出错: {e}") def _get_mode_strategy(self) -> Dict: """获取当前模式的策略配置""" return self.MODE_STRATEGIES.get(self.report_mode, self.MODE_STRATEGIES["daily"]) def _has_notification_configured(self) -> bool: """检查是否配置了任何通知渠道""" return any( [ CONFIG["FEISHU_WEBHOOK_URL"], CONFIG["DINGTALK_WEBHOOK_URL"], CONFIG["WEWORK_WEBHOOK_URL"], (CONFIG["TELEGRAM_BOT_TOKEN"] and CONFIG["TELEGRAM_CHAT_ID"]), ( CONFIG["EMAIL_FROM"] and CONFIG["EMAIL_PASSWORD"] and CONFIG["EMAIL_TO"] ), (CONFIG["NTFY_SERVER_URL"] and CONFIG["NTFY_TOPIC"]), ] ) def _has_valid_content( self, stats: List[Dict], new_titles: Optional[Dict] = None ) -> bool: """检查是否有有效的新闻内容""" if self.report_mode in ["incremental", "current"]: # 增量模式和current模式下,只要stats有内容就说明有匹配的新闻 return any(stat["count"] > 0 for stat in stats) else: # 当日汇总模式下,检查是否有匹配的频率词新闻或新增新闻 has_matched_news = any(stat["count"] > 0 for stat in stats) has_new_news = bool( new_titles and any(len(titles) > 0 for titles in new_titles.values()) ) return has_matched_news or has_new_news def _load_analysis_data( self, ) -> Optional[Tuple[Dict, Dict, Dict, Dict, List, List]]: """统一的数据加载和预处理,使用当前监控平台列表过滤历史数据""" try: # 获取当前配置的监控平台ID列表 current_platform_ids = [] for platform in CONFIG["PLATFORMS"]: current_platform_ids.append(platform["id"]) print(f"当前监控平台: {current_platform_ids}") all_results, id_to_name, title_info = read_all_today_titles( current_platform_ids ) if not all_results: print("没有找到当天的数据") return None total_titles = sum(len(titles) for titles in all_results.values()) print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)") new_titles = detect_latest_new_titles(current_platform_ids) word_groups, filter_words = load_frequency_words() return ( all_results, id_to_name, title_info, new_titles, word_groups, filter_words, ) except Exception as e: print(f"数据加载失败: {e}") return None def _prepare_current_title_info(self, results: Dict, time_info: str) -> Dict: """从当前抓取结果构建标题信息""" title_info = {} for source_id, titles_data in results.items(): title_info[source_id] = {} for title, title_data in titles_data.items(): ranks = title_data.get("ranks", []) url = title_data.get("url", "") mobile_url = title_data.get("mobileUrl", "") title_info[source_id][title] = { "first_time": time_info, "last_time": time_info, "count": 1, "ranks": ranks, "url": url, "mobileUrl": mobile_url, } return title_info def _run_analysis_pipeline( self, data_source: Dict, mode: str, title_info: Dict, new_titles: Dict, word_groups: List[Dict], filter_words: List[str], id_to_name: Dict, failed_ids: Optional[List] = None, is_daily_summary: bool = False, ) -> Tuple[List[Dict], str]: """统一的分析流水线:数据处理 → 统计计算 → HTML生成""" # 统计计算 stats, total_titles = count_word_frequency( data_source, word_groups, filter_words, id_to_name, title_info, self.rank_threshold, new_titles, mode=mode, ) # HTML生成 html_file = generate_html_report( stats, total_titles, failed_ids=failed_ids, new_titles=new_titles, id_to_name=id_to_name, mode=mode, is_daily_summary=is_daily_summary, update_info=self.update_info if CONFIG["SHOW_VERSION_UPDATE"] else None, ) return stats, html_file def _send_notification_if_needed( self, stats: List[Dict], report_type: str, mode: str, failed_ids: Optional[List] = None, new_titles: Optional[Dict] = None, id_to_name: Optional[Dict] = None, html_file_path: Optional[str] = None, ) -> bool: """统一的通知发送逻辑,包含所有判断条件""" has_notification = self._has_notification_configured() if ( CONFIG["ENABLE_NOTIFICATION"] and has_notification and self._has_valid_content(stats, new_titles) ): send_to_notifications( stats, failed_ids or [], report_type, new_titles, id_to_name, self.update_info, self.proxy_url, mode=mode, html_file_path=html_file_path, ) return True elif CONFIG["ENABLE_NOTIFICATION"] and not has_notification: print("⚠️ 警告:通知功能已启用但未配置任何通知渠道,将跳过通知发送") elif not CONFIG["ENABLE_NOTIFICATION"]: print(f"跳过{report_type}通知:通知功能已禁用") elif ( CONFIG["ENABLE_NOTIFICATION"] and has_notification and not self._has_valid_content(stats, new_titles) ): mode_strategy = self._get_mode_strategy() if "实时" in report_type: print( f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻" ) else: print( f"跳过{mode_strategy['summary_report_type']}通知:未匹配到有效的新闻内容" ) return False def _generate_summary_report(self, mode_strategy: Dict) -> Optional[str]: """生成汇总报告(带通知)""" summary_type = ( "当前榜单汇总" if mode_strategy["summary_mode"] == "current" else "当日汇总" ) print(f"生成{summary_type}报告...") # 加载分析数据 analysis_data = self._load_analysis_data() if not analysis_data: return None all_results, id_to_name, title_info, new_titles, word_groups, filter_words = ( analysis_data ) # 运行分析流水线 stats, html_file = self._run_analysis_pipeline( all_results, mode_strategy["summary_mode"], title_info, new_titles, word_groups, filter_words, id_to_name, is_daily_summary=True, ) print(f"{summary_type}报告已生成: {html_file}") # 发送通知 self._send_notification_if_needed( stats, mode_strategy["summary_report_type"], mode_strategy["summary_mode"], failed_ids=[], new_titles=new_titles, id_to_name=id_to_name, html_file_path=html_file, ) return html_file def _generate_summary_html(self, mode: str = "daily") -> Optional[str]: """生成汇总HTML""" summary_type = "当前榜单汇总" if mode == "current" else "当日汇总" print(f"生成{summary_type}HTML...") # 加载分析数据 analysis_data = self._load_analysis_data() if not analysis_data: return None all_results, id_to_name, title_info, new_titles, word_groups, filter_words = ( analysis_data ) # 运行分析流水线 _, html_file = self._run_analysis_pipeline( all_results, mode, title_info, new_titles, word_groups, filter_words, id_to_name, is_daily_summary=True, ) print(f"{summary_type}HTML已生成: {html_file}") return html_file def _initialize_and_check_config(self) -> None: """通用初始化和配置检查""" now = get_beijing_time() print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") if not CONFIG["ENABLE_CRAWLER"]: print("爬虫功能已禁用(ENABLE_CRAWLER=False),程序退出") return has_notification = self._has_notification_configured() if not CONFIG["ENABLE_NOTIFICATION"]: print("通知功能已禁用(ENABLE_NOTIFICATION=False),将只进行数据抓取") elif not has_notification: print("未配置任何通知渠道,将只进行数据抓取,不发送通知") else: print("通知功能已启用,将发送通知") mode_strategy = self._get_mode_strategy() print(f"报告模式: {self.report_mode}") print(f"运行模式: {mode_strategy['description']}") def _crawl_data(self) -> Tuple[Dict, Dict, List]: """执行数据爬取""" ids = [] for platform in CONFIG["PLATFORMS"]: if "name" in platform: ids.append((platform["id"], platform["name"])) else: ids.append(platform["id"]) print( f"配置的监控平台: {[p.get('name', p['id']) for p in CONFIG['PLATFORMS']]}" ) print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒") ensure_directory_exists("output") results, id_to_name, failed_ids = self.data_fetcher.crawl_websites( ids, self.request_interval ) title_file = save_titles_to_file(results, id_to_name, failed_ids) print(f"标题已保存到: {title_file}") return results, id_to_name, failed_ids def _execute_mode_strategy( self, mode_strategy: Dict, results: Dict, id_to_name: Dict, failed_ids: List ) -> Optional[str]: """执行模式特定逻辑""" # 获取当前监控平台ID列表 current_platform_ids = [platform["id"] for platform in CONFIG["PLATFORMS"]] new_titles = detect_latest_new_titles(current_platform_ids) time_info = Path(save_titles_to_file(results, id_to_name, failed_ids)).stem word_groups, filter_words = load_frequency_words() # current模式下,实时推送需要使用完整的历史数据来保证统计信息的完整性 if self.report_mode == "current": # 加载完整的历史数据(已按当前平台过滤) analysis_data = self._load_analysis_data() if analysis_data: ( all_results, historical_id_to_name, historical_title_info, historical_new_titles, _, _, ) = analysis_data print( f"current模式:使用过滤后的历史数据,包含平台:{list(all_results.keys())}" ) stats, html_file = self._run_analysis_pipeline( all_results, self.report_mode, historical_title_info, historical_new_titles, word_groups, filter_words, historical_id_to_name, failed_ids=failed_ids, ) combined_id_to_name = {**historical_id_to_name, **id_to_name} print(f"HTML报告已生成: {html_file}") # 发送实时通知(使用完整历史数据的统计结果) summary_html = None if mode_strategy["should_send_realtime"]: self._send_notification_if_needed( stats, mode_strategy["realtime_report_type"], self.report_mode, failed_ids=failed_ids, new_titles=historical_new_titles, id_to_name=combined_id_to_name, html_file_path=html_file, ) else: print("❌ 严重错误:无法读取刚保存的数据文件") raise RuntimeError("数据一致性检查失败:保存后立即读取失败") else: title_info = self._prepare_current_title_info(results, time_info) stats, html_file = self._run_analysis_pipeline( results, self.report_mode, title_info, new_titles, word_groups, filter_words, id_to_name, failed_ids=failed_ids, ) print(f"HTML报告已生成: {html_file}") # 发送实时通知(如果需要) summary_html = None if mode_strategy["should_send_realtime"]: self._send_notification_if_needed( stats, mode_strategy["realtime_report_type"], self.report_mode, failed_ids=failed_ids, new_titles=new_titles, id_to_name=id_to_name, html_file_path=html_file, ) # 生成汇总报告(如果需要) summary_html = None if mode_strategy["should_generate_summary"]: if mode_strategy["should_send_realtime"]: # 如果已经发送了实时通知,汇总只生成HTML不发送通知 summary_html = self._generate_summary_html( mode_strategy["summary_mode"] ) else: # daily模式:直接生成汇总报告并发送通知 summary_html = self._generate_summary_report(mode_strategy) # 打开浏览器(仅在非容器环境) if self._should_open_browser() and html_file: if summary_html: summary_url = "file://" + str(Path(summary_html).resolve()) print(f"正在打开汇总报告: {summary_url}") webbrowser.open(summary_url) else: file_url = "file://" + str(Path(html_file).resolve()) print(f"正在打开HTML报告: {file_url}") webbrowser.open(file_url) elif self.is_docker_container and html_file: if summary_html: print(f"汇总报告已生成(Docker环境): {summary_html}") else: print(f"HTML报告已生成(Docker环境): {html_file}") return summary_html def run(self) -> None: """执行分析流程""" try: self._initialize_and_check_config() mode_strategy = self._get_mode_strategy() results, id_to_name, failed_ids = self._crawl_data() self._execute_mode_strategy(mode_strategy, results, id_to_name, failed_ids) except Exception as e: print(f"分析流程执行出错: {e}") raise def main(): try: analyzer = NewsAnalyzer() analyzer.run() except FileNotFoundError as e: print(f"❌ 配置文件错误: {e}") print("\n请确保以下文件存在:") print(" • config/config.yaml") print(" • config/frequency_words.txt") print("\n参考项目文档进行正确配置") except Exception as e: print(f"❌ 程序运行错误: {e}") raise if __name__ == "__main__": main()