feat:feishu推送有跳转链接

This commit is contained in:
sansan 2025-06-02 16:51:37 +08:00 committed by GitHub
parent 1d8876575e
commit a19ad13be4

264
main.py
View File

@ -189,14 +189,24 @@ class DataFetcher:
if response:
try:
data = json.loads(response)
# 获取标题列表,同时记录排名
# 获取标题列表,同时记录排名、url和mobileUrl
results[id_value] = {}
for index, item in enumerate(data.get("items", []), 1):
title = item["title"]
# 获取url和mobileUrl提供容错处理
url = item.get("url", "")
mobile_url = item.get("mobileUrl", "")
if title in results[id_value]:
results[id_value][title].append(index)
# 如果标题已存在更新排名列表保持第一个URL
results[id_value][title]["ranks"].append(index)
else:
results[id_value][title] = [index]
# 新标题,存储完整信息
results[id_value][title] = {
"ranks": [index],
"url": url,
"mobileUrl": mobile_url
}
except json.JSONDecodeError:
print(f"解析 {id_value} 的响应失败不是有效的JSON")
failed_ids.append(id_value)
@ -226,7 +236,7 @@ class DataProcessor:
@staticmethod
def save_titles_to_file(results: Dict, id_to_alias: Dict, failed_ids: List) -> str:
"""将标题保存到文件,包括失败的请求信息"""
"""将标题保存到文件,包括失败的请求信息、url和mobileUrl"""
file_path = FileHelper.get_output_path(
"txt", f"{TimeHelper.format_time_filename()}.txt"
)
@ -236,9 +246,24 @@ class DataProcessor:
for id_value, title_data in results.items():
display_name = id_to_alias.get(id_value, id_value)
f.write(f"{display_name}\n")
for i, (title, ranks) in enumerate(title_data.items(), 1):
rank_str = ",".join(map(str, ranks))
f.write(f"{i}. {title} (排名:{rank_str})\n")
for i, (title, info) in enumerate(title_data.items(), 1):
# 处理新格式数据包含ranks、url和mobileUrl
if isinstance(info, dict):
ranks = info.get("ranks", [])
url = info.get("url", "")
mobile_url = info.get("mobileUrl", "")
rank_str = ",".join(map(str, ranks))
# 格式:序号. 标题 (排名:1,2,3) [URL:url] [MOBILE:mobile_url]
line = f"{i}. {title} (排名:{rank_str})"
if url:
line += f" [URL:{url}]"
if mobile_url:
line += f" [MOBILE:{mobile_url}]"
f.write(line + "\n")
else:
# 兼容旧格式数据只有ranks列表
rank_str = ",".join(map(str, info))
f.write(f"{i}. {title} (排名:{rank_str})\n")
f.write("\n")
# 如果有失败的请求,写入失败信息
@ -300,6 +325,7 @@ class DataProcessor:
def read_all_today_titles() -> Tuple[Dict, Dict, Dict]:
"""
读取当天所有txt文件的标题并按来源合并去除重复记录时间和出现次数
兼容新格式包含url和mobileUrl和旧格式数据
Returns:
(all_results, id_to_alias, title_info)元组
@ -311,11 +337,9 @@ class DataProcessor:
print(f"今日文件夹 {txt_dir} 不存在")
return {}, {}, {}
all_results = {} # 所有源的所有标题 {source_id: {title: [ranks]}}
all_results = {} # 所有源的所有标题 {source_id: {title: {"ranks": [排名列表], "url": "链接", "mobileUrl": "移动链接"}}}
id_to_alias = {} # ID到别名的映射
title_info = (
{}
) # 标题信息 {source_id: {title: {"first_time": 首次时间, "last_time": 最后时间, "count": 出现次数, "ranks": [排名列表]}}}
title_info = {} # 标题信息
# 读取所有txt文件按时间排序确保早的时间优先处理
files = sorted([f for f in os.listdir(txt_dir) if f.endswith(".txt")])
@ -341,8 +365,8 @@ class DataProcessor:
# 第一行是来源名
source_name = lines[0].strip()
# 提取标题和排名
title_ranks = {}
# 提取标题和排名,兼容新旧格式
title_data = {}
for line in lines[1:]:
if line.strip():
try:
@ -351,14 +375,25 @@ class DataProcessor:
title_part = line.strip()
# 处理格式 "数字. 标题"
if (
". " in title_part
and title_part.split(". ")[0].isdigit()
):
if ". " in title_part and title_part.split(". ")[0].isdigit():
parts = title_part.split(". ", 1)
match_num = int(parts[0]) # 序号可能是排名
title_part = parts[1]
# 提取mobileUrl信息 "[MOBILE:mobile_url]"
mobile_url = ""
if " [MOBILE:" in title_part:
title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
if mobile_part.endswith("]"):
mobile_url = mobile_part[:-1]
# 提取url信息 "[URL:url]"
url = ""
if " [URL:" in title_part:
title_part, url_part = title_part.rsplit(" [URL:", 1)
if url_part.endswith("]"):
url = url_part[:-1]
# 提取排名信息 "标题 (排名:1,2,3)"
ranks = []
if " (排名:" in title_part:
@ -380,7 +415,11 @@ class DataProcessor:
if not ranks:
ranks = [99] # 默认排名
title_ranks[title] = ranks
title_data[title] = {
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url
}
except Exception as e:
print(f"解析标题行出错: {line}, 错误: {e}")
@ -388,14 +427,14 @@ class DataProcessor:
# 处理来源数据
DataProcessor._process_source_data(
source_name,
title_ranks,
title_data,
time_info,
all_results,
title_info,
id_to_alias,
)
# 将结果从 {source_name: {title: [ranks]}} 转换为 {source_id: {title: [ranks]}}
# 将结果从 {source_name: {title: data}} 转换为 {source_id: {title: data}}
id_results = {}
id_title_info = {}
for name, titles in all_results.items():
@ -410,28 +449,41 @@ class DataProcessor:
@staticmethod
def _process_source_data(
source_name: str,
title_ranks: Dict,
title_data: Dict,
time_info: str,
all_results: Dict,
title_info: Dict,
id_to_alias: Dict,
) -> None:
"""处理来源数据,更新结果和标题信息"""
"""处理来源数据,更新结果和标题信息,兼容新旧数据格式"""
if source_name not in all_results:
# 首次遇到此来源
all_results[source_name] = title_ranks
all_results[source_name] = title_data
# 初始化标题信息
if source_name not in title_info:
title_info[source_name] = {}
# 记录每个标题的时间、次数和排名
for title, ranks in title_ranks.items():
# 记录每个标题的时间、次数、排名、url和mobileUrl
for title, data in title_data.items():
# 兼容新旧格式
if isinstance(data, dict):
ranks = data.get("ranks", [])
url = data.get("url", "")
mobile_url = data.get("mobileUrl", "")
else:
# 旧格式兼容
ranks = data if isinstance(data, list) else []
url = ""
mobile_url = ""
title_info[source_name][title] = {
"first_time": time_info, # 记录首次时间
"last_time": time_info, # 最后时间初始同首次时间
"count": 1,
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
# 尝试反向生成ID
@ -439,28 +491,59 @@ class DataProcessor:
id_to_alias[reversed_id] = source_name
else:
# 已有此来源,更新标题
for title, ranks in title_ranks.items():
for title, data in title_data.items():
# 兼容新旧格式
if isinstance(data, dict):
ranks = data.get("ranks", [])
url = data.get("url", "")
mobile_url = data.get("mobileUrl", "")
else:
# 旧格式兼容
ranks = data if isinstance(data, list) else []
url = ""
mobile_url = ""
if title not in all_results[source_name]:
all_results[source_name][title] = ranks
all_results[source_name][title] = {
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url
}
title_info[source_name][title] = {
"first_time": time_info, # 新标题的首次和最后时间都设为当前
"last_time": time_info,
"count": 1,
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
else:
# 已存在的标题,更新最后时间,合并排名信息并增加计数
existing_ranks = title_info[source_name][title]["ranks"]
existing_data = all_results[source_name][title]
existing_ranks = existing_data.get("ranks", [])
existing_url = existing_data.get("url", "")
existing_mobile_url = existing_data.get("mobileUrl", "")
merged_ranks = existing_ranks.copy()
for rank in ranks:
if rank not in merged_ranks:
merged_ranks.append(rank)
title_info[source_name][title][
"last_time"
] = time_info # 更新最后时间
# 更新数据保持第一个有效的URL
all_results[source_name][title] = {
"ranks": merged_ranks,
"url": existing_url or url,
"mobileUrl": existing_mobile_url or mobile_url
}
title_info[source_name][title]["last_time"] = time_info # 更新最后时间
title_info[source_name][title]["ranks"] = merged_ranks
title_info[source_name][title]["count"] += 1
# 保持第一个有效的URL
if not title_info[source_name][title].get("url"):
title_info[source_name][title]["url"] = url
if not title_info[source_name][title].get("mobileUrl"):
title_info[source_name][title]["mobileUrl"] = mobile_url
class StatisticsCalculator:
@ -477,6 +560,7 @@ class StatisticsCalculator:
) -> Tuple[List[Dict], int]:
"""
统计词频处理关联词和大小写不敏感每个标题只计入首个匹配词组并应用过滤词
支持新格式数据包含url和mobileUrl
Returns:
(stats, total_titles)元组
@ -502,7 +586,7 @@ class StatisticsCalculator:
if source_id not in processed_titles:
processed_titles[source_id] = {}
for title, source_ranks in titles_data.items():
for title, title_data in titles_data.items():
# 跳过已处理的标题
if title in processed_titles.get(source_id, {}):
continue
@ -518,6 +602,17 @@ class StatisticsCalculator:
if contains_filter_word:
continue
# 兼容新旧数据格式
if isinstance(title_data, dict):
source_ranks = title_data.get("ranks", [])
source_url = title_data.get("url", "")
source_mobile_url = title_data.get("mobileUrl", "")
else:
# 旧格式兼容
source_ranks = title_data if isinstance(title_data, list) else []
source_url = ""
source_mobile_url = ""
# 按顺序检查每个词组
for group in word_groups:
group_key = " ".join(group)
@ -536,6 +631,8 @@ class StatisticsCalculator:
last_time = ""
count_info = 1
ranks = source_ranks if source_ranks else []
url = source_url
mobile_url = source_mobile_url
if (
title_info
@ -548,6 +645,8 @@ class StatisticsCalculator:
count_info = info.get("count", 1)
if "ranks" in info and info["ranks"]:
ranks = info["ranks"]
url = info.get("url", source_url)
mobile_url = info.get("mobileUrl", source_mobile_url)
# 确保排名是有效的
if not ranks:
@ -570,6 +669,8 @@ class StatisticsCalculator:
"count": count_info,
"ranks": ranks,
"rank_threshold": rank_threshold,
"url": url, # 新增url字段
"mobileUrl": mobile_url, # 新增mobileUrl字段
}
)
@ -719,7 +820,7 @@ class ReportGenerator:
failed_ids: Optional[List] = None,
is_daily: bool = False,
) -> str:
"""创建HTML内容"""
"""创建HTML内容,支持可点击的新闻链接"""
# HTML头部
html = """
<!DOCTYPE html>
@ -740,6 +841,21 @@ class ReportGenerator:
.titles { max-width: 500px; }
.source { color: #666; font-style: italic; }
.error { color: #d9534f; }
.news-link {
color: #007bff;
text-decoration: none;
border-bottom: 1px dotted #007bff;
}
.news-link:hover {
color: #0056b3;
text-decoration: underline;
}
.news-link:visited {
color: #6f42c1;
}
.no-link {
color: #333;
}
</style>
</head>
<body>
@ -763,7 +879,7 @@ class ReportGenerator:
<ul>
"""
for id_value in failed_ids:
html += f"<li>{id_value}</li>"
html += f"<li>{ReportGenerator._html_escape(id_value)}</li>"
html += """
</ul>
</div>
@ -792,27 +908,45 @@ class ReportGenerator:
count_info = title_data["count"]
ranks = title_data["ranks"]
rank_threshold = title_data["rank_threshold"]
url = title_data.get("url", "")
mobile_url = title_data.get("mobileUrl", "")
# 使用HTML格式化排名
rank_display = StatisticsCalculator._format_rank_for_html(
ranks, rank_threshold
)
# 格式化标题信息
formatted_title = f"[{source_alias}] {title}"
# 优先使用mobileUrl然后是url最后无链接
link_url = mobile_url or url
# 格式化标题信息,添加链接支持
escaped_title = ReportGenerator._html_escape(title)
escaped_source_alias = ReportGenerator._html_escape(source_alias)
if link_url:
# 转义URL
escaped_url = ReportGenerator._html_escape(link_url)
# 有链接时使用a标签包装标题
formatted_title = f"[{escaped_source_alias}] <a href=\"{escaped_url}\" target=\"_blank\" class=\"news-link\">{escaped_title}</a>"
else:
# 没有链接时,使用普通文本
formatted_title = f"[{escaped_source_alias}] <span class=\"no-link\">{escaped_title}</span>"
if rank_display:
formatted_title += f" {rank_display}"
if time_display:
formatted_title += f" <font color='grey'>- {time_display}</font>"
escaped_time_display = ReportGenerator._html_escape(time_display)
formatted_title += f" <font color='grey'>- {escaped_time_display}</font>"
if count_info > 1:
formatted_title += f" <font color='green'>({count_info}次)</font>"
formatted_titles.append(formatted_title)
escaped_word = ReportGenerator._html_escape(stat['word'])
html += f"""
<tr>
<td>{i}</td>
<td class="word">{stat['word']}</td>
<td class="word">{escaped_word}</td>
<td class="count">{stat['count']}</td>
<td class="percentage">{stat['percentage']}%</td>
<td class="titles">{"<br>".join(formatted_titles)}</td>
@ -828,6 +962,18 @@ class ReportGenerator:
return html
@staticmethod
def _html_escape(text: str) -> str:
"""HTML转义函数"""
if not isinstance(text, str):
text = str(text)
return (text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#x27;"))
@staticmethod
def send_to_feishu(
stats: List[Dict],
@ -887,7 +1033,7 @@ class ReportGenerator:
def _build_feishu_content(
stats: List[Dict], failed_ids: Optional[List] = None
) -> str:
"""构建飞书消息内容,使用富文本格式"""
"""构建飞书消息内容,使用富文本格式和markdown链接优先使用mobileUrl"""
text_content = ""
# 添加频率词统计信息
@ -926,25 +1072,26 @@ class ReportGenerator:
count_info = title_data["count"]
ranks = title_data["ranks"]
rank_threshold = title_data["rank_threshold"]
url = title_data.get("url", "")
mobile_url = title_data.get("mobileUrl", "")
# 使用飞书格式化排名
rank_display = StatisticsCalculator._format_rank_for_feishu(
ranks, rank_threshold
)
# 格式化标题信息
formatted_title = f"[{source_alias}] {title}"
if rank_display:
formatted_title += f" {rank_display}"
if time_display:
formatted_title += f" <font color='grey'>- {time_display}</font>"
if count_info > 1:
formatted_title += f" <font color='green'>({count_info}次)</font>"
# 格式化标题信息优先使用mobileUrl然后是url
link_url = mobile_url or url # 优先使用mobileUrl没有则使用url
if link_url:
# 如果有链接使用markdown链接格式
formatted_title = f"[{title}]({link_url})"
else:
# 如果都没有链接,只显示标题
formatted_title = title
# 使用灰色显示来源
text_content += (
f" {j}. <font color='grey'>[{source_alias}]</font> {title}"
)
# 构建完整的标题行
text_content += f" {j}. <font color='grey'>[{source_alias}]</font> {formatted_title}"
if rank_display:
text_content += f" {rank_display}"
if time_display:
@ -1121,12 +1268,25 @@ class NewsAnalyzer:
title_info = {}
for source_id, titles_data in results.items():
title_info[source_id] = {}
for title, ranks in titles_data.items():
for title, title_data in titles_data.items():
# 兼容新格式数据
if isinstance(title_data, dict):
ranks = title_data.get("ranks", [])
url = title_data.get("url", "")
mobile_url = title_data.get("mobileUrl", "")
else:
# 兼容旧格式数据
ranks = title_data if isinstance(title_data, list) else []
url = ""
mobile_url = ""
title_info[source_id][title] = {
"first_time": time_info,
"last_time": time_info,
"count": 1,
"ranks": ranks,
"url": url,
"mobileUrl": mobile_url,
}
# 加载频率词和过滤词