diff --git a/main.py b/main.py index 83e1c56..f18d70d 100644 --- a/main.py +++ b/main.py @@ -189,14 +189,24 @@ class DataFetcher: if response: try: data = json.loads(response) - # 获取标题列表,同时记录排名 + # 获取标题列表,同时记录排名、url和mobileUrl results[id_value] = {} for index, item in enumerate(data.get("items", []), 1): title = item["title"] + # 获取url和mobileUrl,提供容错处理 + url = item.get("url", "") + mobile_url = item.get("mobileUrl", "") + if title in results[id_value]: - results[id_value][title].append(index) + # 如果标题已存在,更新排名列表,保持第一个URL + results[id_value][title]["ranks"].append(index) else: - results[id_value][title] = [index] + # 新标题,存储完整信息 + results[id_value][title] = { + "ranks": [index], + "url": url, + "mobileUrl": mobile_url + } except json.JSONDecodeError: print(f"解析 {id_value} 的响应失败,不是有效的JSON") failed_ids.append(id_value) @@ -226,7 +236,7 @@ class DataProcessor: @staticmethod def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str: - """将标题保存到文件,包括失败的请求信息""" + """将标题保存到文件,包括失败的请求信息、url和mobileUrl""" file_path = FileHelper.get_output_path( "txt", f"{TimeHelper.format_time_filename()}.txt" ) @@ -236,9 +246,24 @@ class DataProcessor: for id_value, title_data in results.items(): display_name = id_to_alias.get(id_value, id_value) f.write(f"{display_name}\n") - for i, (title, ranks) in enumerate(title_data.items(), 1): - rank_str = ",".join(map(str, ranks)) - f.write(f"{i}. {title} (排名:{rank_str})\n") + for i, (title, info) in enumerate(title_data.items(), 1): + # 处理新格式数据(包含ranks、url和mobileUrl) + if isinstance(info, dict): + ranks = info.get("ranks", []) + url = info.get("url", "") + mobile_url = info.get("mobileUrl", "") + rank_str = ",".join(map(str, ranks)) + # 格式:序号. 标题 (排名:1,2,3) [URL:url] [MOBILE:mobile_url] + line = f"{i}. {title} (排名:{rank_str})" + if url: + line += f" [URL:{url}]" + if mobile_url: + line += f" [MOBILE:{mobile_url}]" + f.write(line + "\n") + else: + # 兼容旧格式数据(只有ranks列表) + rank_str = ",".join(map(str, info)) + f.write(f"{i}. {title} (排名:{rank_str})\n") f.write("\n") # 如果有失败的请求,写入失败信息 @@ -300,6 +325,7 @@ class DataProcessor: def read_all_today_titles() -> Tuple[Dict, Dict, Dict]: """ 读取当天所有txt文件的标题,并按来源合并,去除重复,记录时间和出现次数 + 兼容新格式(包含url和mobileUrl)和旧格式数据 Returns: (all_results, id_to_alias, title_info)元组 @@ -311,11 +337,9 @@ class DataProcessor: print(f"今日文件夹 {txt_dir} 不存在") return {}, {}, {} - all_results = {} # 所有源的所有标题 {source_id: {title: [ranks]}} + all_results = {} # 所有源的所有标题 {source_id: {title: {"ranks": [排名列表], "url": "链接", "mobileUrl": "移动链接"}}} id_to_alias = {} # ID到别名的映射 - title_info = ( - {} - ) # 标题信息 {source_id: {title: {"first_time": 首次时间, "last_time": 最后时间, "count": 出现次数, "ranks": [排名列表]}}} + title_info = {} # 标题信息 # 读取所有txt文件,按时间排序确保早的时间优先处理 files = sorted([f for f in os.listdir(txt_dir) if f.endswith(".txt")]) @@ -341,8 +365,8 @@ class DataProcessor: # 第一行是来源名 source_name = lines[0].strip() - # 提取标题和排名 - title_ranks = {} + # 提取标题和排名,兼容新旧格式 + title_data = {} for line in lines[1:]: if line.strip(): try: @@ -351,14 +375,25 @@ class DataProcessor: title_part = line.strip() # 处理格式 "数字. 标题" - if ( - ". " in title_part - and title_part.split(". ")[0].isdigit() - ): + if ". " in title_part and title_part.split(". ")[0].isdigit(): parts = title_part.split(". ", 1) match_num = int(parts[0]) # 序号可能是排名 title_part = parts[1] + # 提取mobileUrl信息 "[MOBILE:mobile_url]" + mobile_url = "" + if " [MOBILE:" in title_part: + title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1) + if mobile_part.endswith("]"): + mobile_url = mobile_part[:-1] + + # 提取url信息 "[URL:url]" + url = "" + if " [URL:" in title_part: + title_part, url_part = title_part.rsplit(" [URL:", 1) + if url_part.endswith("]"): + url = url_part[:-1] + # 提取排名信息 "标题 (排名:1,2,3)" ranks = [] if " (排名:" in title_part: @@ -380,7 +415,11 @@ class DataProcessor: if not ranks: ranks = [99] # 默认排名 - title_ranks[title] = ranks + title_data[title] = { + "ranks": ranks, + "url": url, + "mobileUrl": mobile_url + } except Exception as e: print(f"解析标题行出错: {line}, 错误: {e}") @@ -388,14 +427,14 @@ class DataProcessor: # 处理来源数据 DataProcessor._process_source_data( source_name, - title_ranks, + title_data, time_info, all_results, title_info, id_to_alias, ) - # 将结果从 {source_name: {title: [ranks]}} 转换为 {source_id: {title: [ranks]}} + # 将结果从 {source_name: {title: data}} 转换为 {source_id: {title: data}} id_results = {} id_title_info = {} for name, titles in all_results.items(): @@ -410,28 +449,41 @@ class DataProcessor: @staticmethod def _process_source_data( source_name: str, - title_ranks: Dict, + title_data: Dict, time_info: str, all_results: Dict, title_info: Dict, id_to_alias: Dict, ) -> None: - """处理来源数据,更新结果和标题信息""" + """处理来源数据,更新结果和标题信息,兼容新旧数据格式""" if source_name not in all_results: # 首次遇到此来源 - all_results[source_name] = title_ranks + all_results[source_name] = title_data # 初始化标题信息 if source_name not in title_info: title_info[source_name] = {} - # 记录每个标题的时间、次数和排名 - for title, ranks in title_ranks.items(): + # 记录每个标题的时间、次数、排名、url和mobileUrl + for title, data in title_data.items(): + # 兼容新旧格式 + if isinstance(data, dict): + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") + else: + # 旧格式兼容 + ranks = data if isinstance(data, list) else [] + url = "" + mobile_url = "" + title_info[source_name][title] = { "first_time": time_info, # 记录首次时间 "last_time": time_info, # 最后时间初始同首次时间 "count": 1, "ranks": ranks, + "url": url, + "mobileUrl": mobile_url, } # 尝试反向生成ID @@ -439,28 +491,59 @@ class DataProcessor: id_to_alias[reversed_id] = source_name else: # 已有此来源,更新标题 - for title, ranks in title_ranks.items(): + for title, data in title_data.items(): + # 兼容新旧格式 + if isinstance(data, dict): + ranks = data.get("ranks", []) + url = data.get("url", "") + mobile_url = data.get("mobileUrl", "") + else: + # 旧格式兼容 + ranks = data if isinstance(data, list) else [] + url = "" + mobile_url = "" + if title not in all_results[source_name]: - all_results[source_name][title] = ranks + all_results[source_name][title] = { + "ranks": ranks, + "url": url, + "mobileUrl": mobile_url + } title_info[source_name][title] = { "first_time": time_info, # 新标题的首次和最后时间都设为当前 "last_time": time_info, "count": 1, "ranks": ranks, + "url": url, + "mobileUrl": mobile_url, } else: # 已存在的标题,更新最后时间,合并排名信息并增加计数 - existing_ranks = title_info[source_name][title]["ranks"] + existing_data = all_results[source_name][title] + existing_ranks = existing_data.get("ranks", []) + existing_url = existing_data.get("url", "") + existing_mobile_url = existing_data.get("mobileUrl", "") + merged_ranks = existing_ranks.copy() for rank in ranks: if rank not in merged_ranks: merged_ranks.append(rank) - title_info[source_name][title][ - "last_time" - ] = time_info # 更新最后时间 + # 更新数据,保持第一个有效的URL + all_results[source_name][title] = { + "ranks": merged_ranks, + "url": existing_url or url, + "mobileUrl": existing_mobile_url or mobile_url + } + + title_info[source_name][title]["last_time"] = time_info # 更新最后时间 title_info[source_name][title]["ranks"] = merged_ranks title_info[source_name][title]["count"] += 1 + # 保持第一个有效的URL + if not title_info[source_name][title].get("url"): + title_info[source_name][title]["url"] = url + if not title_info[source_name][title].get("mobileUrl"): + title_info[source_name][title]["mobileUrl"] = mobile_url class StatisticsCalculator: @@ -477,6 +560,7 @@ class StatisticsCalculator: ) -> Tuple[List[Dict], int]: """ 统计词频,处理关联词和大小写不敏感,每个标题只计入首个匹配词组,并应用过滤词 + 支持新格式数据(包含url和mobileUrl) Returns: (stats, total_titles)元组 @@ -502,7 +586,7 @@ class StatisticsCalculator: if source_id not in processed_titles: processed_titles[source_id] = {} - for title, source_ranks in titles_data.items(): + for title, title_data in titles_data.items(): # 跳过已处理的标题 if title in processed_titles.get(source_id, {}): continue @@ -518,6 +602,17 @@ class StatisticsCalculator: if contains_filter_word: continue + # 兼容新旧数据格式 + if isinstance(title_data, dict): + source_ranks = title_data.get("ranks", []) + source_url = title_data.get("url", "") + source_mobile_url = title_data.get("mobileUrl", "") + else: + # 旧格式兼容 + source_ranks = title_data if isinstance(title_data, list) else [] + source_url = "" + source_mobile_url = "" + # 按顺序检查每个词组 for group in word_groups: group_key = " ".join(group) @@ -536,6 +631,8 @@ class StatisticsCalculator: last_time = "" count_info = 1 ranks = source_ranks if source_ranks else [] + url = source_url + mobile_url = source_mobile_url if ( title_info @@ -548,6 +645,8 @@ class StatisticsCalculator: count_info = info.get("count", 1) if "ranks" in info and info["ranks"]: ranks = info["ranks"] + url = info.get("url", source_url) + mobile_url = info.get("mobileUrl", source_mobile_url) # 确保排名是有效的 if not ranks: @@ -570,6 +669,8 @@ class StatisticsCalculator: "count": count_info, "ranks": ranks, "rank_threshold": rank_threshold, + "url": url, # 新增url字段 + "mobileUrl": mobile_url, # 新增mobileUrl字段 } ) @@ -719,7 +820,7 @@ class ReportGenerator: failed_ids: Optional[List] = None, is_daily: bool = False, ) -> str: - """创建HTML内容""" + """创建HTML内容,支持可点击的新闻链接""" # HTML头部 html = """ @@ -740,6 +841,21 @@ class ReportGenerator: .titles { max-width: 500px; } .source { color: #666; font-style: italic; } .error { color: #d9534f; } + .news-link { + color: #007bff; + text-decoration: none; + border-bottom: 1px dotted #007bff; + } + .news-link:hover { + color: #0056b3; + text-decoration: underline; + } + .news-link:visited { + color: #6f42c1; + } + .no-link { + color: #333; + } @@ -763,7 +879,7 @@ class ReportGenerator: @@ -792,27 +908,45 @@ class ReportGenerator: count_info = title_data["count"] ranks = title_data["ranks"] rank_threshold = title_data["rank_threshold"] + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") # 使用HTML格式化排名 rank_display = StatisticsCalculator._format_rank_for_html( ranks, rank_threshold ) - # 格式化标题信息 - formatted_title = f"[{source_alias}] {title}" + # 优先使用mobileUrl,然后是url,最后无链接 + link_url = mobile_url or url + + # 格式化标题信息,添加链接支持 + escaped_title = ReportGenerator._html_escape(title) + escaped_source_alias = ReportGenerator._html_escape(source_alias) + + if link_url: + # 转义URL + escaped_url = ReportGenerator._html_escape(link_url) + # 有链接时,使用a标签包装标题 + formatted_title = f"[{escaped_source_alias}] {escaped_title}" + else: + # 没有链接时,使用普通文本 + formatted_title = f"[{escaped_source_alias}] {escaped_title}" + if rank_display: formatted_title += f" {rank_display}" if time_display: - formatted_title += f" - {time_display}" + escaped_time_display = ReportGenerator._html_escape(time_display) + formatted_title += f" - {escaped_time_display}" if count_info > 1: formatted_title += f" ({count_info}次)" formatted_titles.append(formatted_title) + escaped_word = ReportGenerator._html_escape(stat['word']) html += f""" {i} - {stat['word']} + {escaped_word} {stat['count']} {stat['percentage']}% {"
".join(formatted_titles)} @@ -828,6 +962,18 @@ class ReportGenerator: return html + @staticmethod + def _html_escape(text: str) -> str: + """HTML转义函数""" + if not isinstance(text, str): + text = str(text) + + return (text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'")) + @staticmethod def send_to_feishu( stats: List[Dict], @@ -887,7 +1033,7 @@ class ReportGenerator: def _build_feishu_content( stats: List[Dict], failed_ids: Optional[List] = None ) -> str: - """构建飞书消息内容,使用富文本格式""" + """构建飞书消息内容,使用富文本格式和markdown链接,优先使用mobileUrl""" text_content = "" # 添加频率词统计信息 @@ -926,25 +1072,26 @@ class ReportGenerator: count_info = title_data["count"] ranks = title_data["ranks"] rank_threshold = title_data["rank_threshold"] + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") # 使用飞书格式化排名 rank_display = StatisticsCalculator._format_rank_for_feishu( ranks, rank_threshold ) - # 格式化标题信息 - formatted_title = f"[{source_alias}] {title}" - if rank_display: - formatted_title += f" {rank_display}" - if time_display: - formatted_title += f" - {time_display}" - if count_info > 1: - formatted_title += f" ({count_info}次)" + # 格式化标题信息,优先使用mobileUrl,然后是url + link_url = mobile_url or url # 优先使用mobileUrl,没有则使用url + if link_url: + # 如果有链接,使用markdown链接格式 + formatted_title = f"[{title}]({link_url})" + else: + # 如果都没有链接,只显示标题 + formatted_title = title - # 使用灰色显示来源 - text_content += ( - f" {j}. [{source_alias}] {title}" - ) + # 构建完整的标题行 + text_content += f" {j}. [{source_alias}] {formatted_title}" + if rank_display: text_content += f" {rank_display}" if time_display: @@ -1121,12 +1268,25 @@ class NewsAnalyzer: title_info = {} for source_id, titles_data in results.items(): title_info[source_id] = {} - for title, ranks in titles_data.items(): + for title, title_data in titles_data.items(): + # 兼容新格式数据 + if isinstance(title_data, dict): + ranks = title_data.get("ranks", []) + url = title_data.get("url", "") + mobile_url = title_data.get("mobileUrl", "") + else: + # 兼容旧格式数据 + ranks = title_data if isinstance(title_data, list) else [] + url = "" + mobile_url = "" + title_info[source_id][title] = { "first_time": time_info, "last_time": time_info, "count": 1, "ranks": ranks, + "url": url, + "mobileUrl": mobile_url, } # 加载频率词和过滤词