v4.0.0 大大大更新

This commit is contained in:
sansan
2025-12-13 13:44:35 +08:00
parent 97c05aa33c
commit c7bacdfff7
61 changed files with 12407 additions and 5889 deletions
+40
View File
@@ -0,0 +1,40 @@
# coding=utf-8
"""
报告生成模块
提供报告生成和格式化功能,包括:
- HTML 报告生成
- 标题格式化工具
模块结构:
- helpers: 报告辅助函数(清理、转义、格式化)
- formatter: 平台标题格式化
- html: HTML 报告渲染
- generator: 报告生成器
"""
from trendradar.report.helpers import (
clean_title,
html_escape,
format_rank_display,
)
from trendradar.report.formatter import format_title_for_platform
from trendradar.report.html import render_html_content
from trendradar.report.generator import (
prepare_report_data,
generate_html_report,
)
__all__ = [
# 辅助函数
"clean_title",
"html_escape",
"format_rank_display",
# 格式化函数
"format_title_for_platform",
# HTML 渲染
"render_html_content",
# 报告生成器
"prepare_report_data",
"generate_html_report",
]
+223
View File
@@ -0,0 +1,223 @@
# coding=utf-8
"""
平台标题格式化模块
提供多平台标题格式化功能
"""
from typing import Dict
from trendradar.report.helpers import clean_title, html_escape, format_rank_display
def format_title_for_platform(
platform: str, title_data: Dict, show_source: bool = True
) -> str:
"""统一的标题格式化方法
为不同平台生成对应格式的标题字符串。
Args:
platform: 目标平台,支持:
- "feishu": 飞书
- "dingtalk": 钉钉
- "wework": 企业微信
- "bark": Bark
- "telegram": Telegram
- "ntfy": ntfy
- "slack": Slack
- "html": HTML 报告
title_data: 标题数据字典,包含以下字段:
- title: 标题文本
- source_name: 来源名称
- time_display: 时间显示
- count: 出现次数
- ranks: 排名列表
- rank_threshold: 高亮阈值
- url: PC端链接
- mobile_url: 移动端链接(优先使用)
- is_new: 是否为新增标题(可选)
show_source: 是否显示来源名称
Returns:
格式化后的标题字符串
"""
rank_display = format_rank_display(
title_data["ranks"], title_data["rank_threshold"], platform
)
link_url = title_data["mobile_url"] or title_data["url"]
cleaned_title = clean_title(title_data["title"])
if platform == "feishu":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"<font color='grey'>[{title_data['source_name']}]</font> {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" <font color='grey'>- {title_data['time_display']}</font>"
if title_data["count"] > 1:
result += f" <font color='green'>({title_data['count']}次)</font>"
return result
elif platform == "dingtalk":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform in ("wework", "bark"):
# WeWork 和 Bark 使用 markdown 格式
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" - {title_data['time_display']}"
if title_data["count"] > 1:
result += f" ({title_data['count']}次)"
return result
elif platform == "telegram":
if link_url:
formatted_title = f'<a href="{link_url}">{html_escape(cleaned_title)}</a>'
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" <code>- {title_data['time_display']}</code>"
if title_data["count"] > 1:
result += f" <code>({title_data['count']}次)</code>"
return result
elif platform == "ntfy":
if link_url:
formatted_title = f"[{cleaned_title}]({link_url})"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" `- {title_data['time_display']}`"
if title_data["count"] > 1:
result += f" `({title_data['count']}次)`"
return result
elif platform == "slack":
# Slack 使用 mrkdwn 格式
if link_url:
# Slack 链接格式: <url|text>
formatted_title = f"<{link_url}|{cleaned_title}>"
else:
formatted_title = cleaned_title
title_prefix = "🆕 " if title_data.get("is_new") else ""
if show_source:
result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
else:
result = f"{title_prefix}{formatted_title}"
# 排名(使用 * 加粗)
rank_display = format_rank_display(
title_data["ranks"], title_data["rank_threshold"], "slack"
)
if rank_display:
result += f" {rank_display}"
if title_data["time_display"]:
result += f" `- {title_data['time_display']}`"
if title_data["count"] > 1:
result += f" `({title_data['count']}次)`"
return result
elif platform == "html":
rank_display = format_rank_display(
title_data["ranks"], title_data["rank_threshold"], "html"
)
link_url = title_data["mobile_url"] or title_data["url"]
escaped_title = html_escape(cleaned_title)
escaped_source_name = html_escape(title_data["source_name"])
if link_url:
escaped_url = html_escape(link_url)
formatted_title = f'[{escaped_source_name}] <a href="{escaped_url}" target="_blank" class="news-link">{escaped_title}</a>'
else:
formatted_title = (
f'[{escaped_source_name}] <span class="no-link">{escaped_title}</span>'
)
if rank_display:
formatted_title += f" {rank_display}"
if title_data["time_display"]:
escaped_time = html_escape(title_data["time_display"])
formatted_title += f" <font color='grey'>- {escaped_time}</font>"
if title_data["count"] > 1:
formatted_title += f" <font color='green'>({title_data['count']}次)</font>"
if title_data.get("is_new"):
formatted_title = f"<div class='new-title'>🆕 {formatted_title}</div>"
return formatted_title
else:
return cleaned_title
+235
View File
@@ -0,0 +1,235 @@
# coding=utf-8
"""
报告生成模块
提供报告数据准备和 HTML 生成功能:
- prepare_report_data: 准备报告数据
- generate_html_report: 生成 HTML 报告
"""
from pathlib import Path
from typing import Dict, List, Optional, Callable
def prepare_report_data(
stats: List[Dict],
failed_ids: Optional[List] = None,
new_titles: Optional[Dict] = None,
id_to_name: Optional[Dict] = None,
mode: str = "daily",
rank_threshold: int = 3,
matches_word_groups_func: Optional[Callable] = None,
load_frequency_words_func: Optional[Callable] = None,
) -> Dict:
"""
准备报告数据
Args:
stats: 统计结果列表
failed_ids: 失败的 ID 列表
new_titles: 新增标题
id_to_name: ID 到名称的映射
mode: 报告模式 (daily/incremental/current)
rank_threshold: 排名阈值
matches_word_groups_func: 词组匹配函数
load_frequency_words_func: 加载频率词函数
Returns:
Dict: 准备好的报告数据
"""
processed_new_titles = []
# 在增量模式下隐藏新增新闻区域
hide_new_section = mode == "incremental"
# 只有在非隐藏模式下才处理新增新闻部分
if not hide_new_section:
filtered_new_titles = {}
if new_titles and id_to_name:
# 如果提供了匹配函数,使用它过滤
if matches_word_groups_func and load_frequency_words_func:
word_groups, filter_words, global_filters = load_frequency_words_func()
for source_id, titles_data in new_titles.items():
filtered_titles = {}
for title, title_data in titles_data.items():
if matches_word_groups_func(title, word_groups, filter_words, global_filters):
filtered_titles[title] = title_data
if filtered_titles:
filtered_new_titles[source_id] = filtered_titles
else:
# 没有匹配函数时,使用全部
filtered_new_titles = new_titles
# 打印过滤后的新增热点数(与推送显示一致)
original_new_count = sum(len(titles) for titles in new_titles.values()) if new_titles else 0
filtered_new_count = sum(len(titles) for titles in filtered_new_titles.values()) if filtered_new_titles else 0
if original_new_count > 0:
print(f"频率词过滤后:{filtered_new_count} 条新增热点匹配(原始 {original_new_count} 条)")
if filtered_new_titles and id_to_name:
for source_id, titles_data in filtered_new_titles.items():
source_name = id_to_name.get(source_id, source_id)
source_titles = []
for title, title_data in titles_data.items():
url = title_data.get("url", "")
mobile_url = title_data.get("mobileUrl", "")
ranks = title_data.get("ranks", [])
processed_title = {
"title": title,
"source_name": source_name,
"time_display": "",
"count": 1,
"ranks": ranks,
"rank_threshold": rank_threshold,
"url": url,
"mobile_url": mobile_url,
"is_new": True,
}
source_titles.append(processed_title)
if source_titles:
processed_new_titles.append(
{
"source_id": source_id,
"source_name": source_name,
"titles": source_titles,
}
)
processed_stats = []
for stat in stats:
if stat["count"] <= 0:
continue
processed_titles = []
for title_data in stat["titles"]:
processed_title = {
"title": title_data["title"],
"source_name": title_data["source_name"],
"time_display": title_data["time_display"],
"count": title_data["count"],
"ranks": title_data["ranks"],
"rank_threshold": title_data["rank_threshold"],
"url": title_data.get("url", ""),
"mobile_url": title_data.get("mobileUrl", ""),
"is_new": title_data.get("is_new", False),
}
processed_titles.append(processed_title)
processed_stats.append(
{
"word": stat["word"],
"count": stat["count"],
"percentage": stat.get("percentage", 0),
"titles": processed_titles,
}
)
return {
"stats": processed_stats,
"new_titles": processed_new_titles,
"failed_ids": failed_ids or [],
"total_new_count": sum(
len(source["titles"]) for source in processed_new_titles
),
}
def generate_html_report(
stats: List[Dict],
total_titles: int,
failed_ids: Optional[List] = None,
new_titles: Optional[Dict] = None,
id_to_name: Optional[Dict] = None,
mode: str = "daily",
is_daily_summary: bool = False,
update_info: Optional[Dict] = None,
rank_threshold: int = 3,
output_dir: str = "output",
date_folder: str = "",
time_filename: str = "",
render_html_func: Optional[Callable] = None,
matches_word_groups_func: Optional[Callable] = None,
load_frequency_words_func: Optional[Callable] = None,
enable_index_copy: bool = True,
) -> str:
"""
生成 HTML 报告
Args:
stats: 统计结果列表
total_titles: 总标题数
failed_ids: 失败的 ID 列表
new_titles: 新增标题
id_to_name: ID 到名称的映射
mode: 报告模式 (daily/incremental/current)
is_daily_summary: 是否是每日汇总
update_info: 更新信息
rank_threshold: 排名阈值
output_dir: 输出目录
date_folder: 日期文件夹名称
time_filename: 时间文件名
render_html_func: HTML 渲染函数
matches_word_groups_func: 词组匹配函数
load_frequency_words_func: 加载频率词函数
enable_index_copy: 是否复制到 index.html
Returns:
str: 生成的 HTML 文件路径
"""
if is_daily_summary:
if mode == "current":
filename = "当前榜单汇总.html"
elif mode == "incremental":
filename = "当日增量.html"
else:
filename = "当日汇总.html"
else:
filename = f"{time_filename}.html"
# 构建输出路径
output_path = Path(output_dir) / date_folder / "html"
output_path.mkdir(parents=True, exist_ok=True)
file_path = str(output_path / filename)
# 准备报告数据
report_data = prepare_report_data(
stats,
failed_ids,
new_titles,
id_to_name,
mode,
rank_threshold,
matches_word_groups_func,
load_frequency_words_func,
)
# 渲染 HTML 内容
if render_html_func:
html_content = render_html_func(
report_data, total_titles, is_daily_summary, mode, update_info
)
else:
# 默认简单 HTML
html_content = f"<html><body><h1>Report</h1><pre>{report_data}</pre></body></html>"
# 写入文件
with open(file_path, "w", encoding="utf-8") as f:
f.write(html_content)
# 如果是每日汇总且启用 index 复制
if is_daily_summary and enable_index_copy:
# 生成到根目录(供 GitHub Pages 访问)
root_index_path = Path("index.html")
with open(root_index_path, "w", encoding="utf-8") as f:
f.write(html_content)
# 同时生成到 output 目录(供 Docker Volume 挂载访问)
output_index_path = Path(output_dir) / "index.html"
Path(output_dir).mkdir(parents=True, exist_ok=True)
with open(output_index_path, "w", encoding="utf-8") as f:
f.write(html_content)
return file_path
+125
View File
@@ -0,0 +1,125 @@
# coding=utf-8
"""
报告辅助函数模块
提供报告生成相关的通用辅助函数
"""
import re
from typing import List
def clean_title(title: str) -> str:
"""清理标题中的特殊字符
清理规则:
- 将换行符(\n, \r)替换为空格
- 将多个连续空白字符合并为单个空格
- 去除首尾空白
Args:
title: 原始标题字符串
Returns:
清理后的标题字符串
"""
if not isinstance(title, str):
title = str(title)
cleaned_title = title.replace("\n", " ").replace("\r", " ")
cleaned_title = re.sub(r"\s+", " ", cleaned_title)
cleaned_title = cleaned_title.strip()
return cleaned_title
def html_escape(text: str) -> str:
"""HTML特殊字符转义
转义规则(按顺序):
- & → &amp;
- < → &lt;
- > → &gt;
- " → &quot;
- ' → &#x27;
Args:
text: 原始文本
Returns:
转义后的文本
"""
if not isinstance(text, str):
text = str(text)
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#x27;")
)
def format_rank_display(ranks: List[int], rank_threshold: int, format_type: str) -> str:
"""格式化排名显示
根据不同平台类型生成对应格式的排名字符串。
当最小排名小于等于阈值时,使用高亮格式。
Args:
ranks: 排名列表(可能包含重复值)
rank_threshold: 高亮阈值,小于等于此值的排名会高亮显示
format_type: 平台类型,支持:
- "html": HTML格式
- "feishu": 飞书格式
- "dingtalk": 钉钉格式
- "wework": 企业微信格式
- "telegram": Telegram格式
- "slack": Slack格式
- 其他: 默认markdown格式
Returns:
格式化后的排名字符串,如 "[1]""[1 - 5]"
如果排名列表为空,返回空字符串
"""
if not ranks:
return ""
unique_ranks = sorted(set(ranks))
min_rank = unique_ranks[0]
max_rank = unique_ranks[-1]
# 根据平台类型选择高亮格式
if format_type == "html":
highlight_start = "<font color='red'><strong>"
highlight_end = "</strong></font>"
elif format_type == "feishu":
highlight_start = "<font color='red'>**"
highlight_end = "**</font>"
elif format_type == "dingtalk":
highlight_start = "**"
highlight_end = "**"
elif format_type == "wework":
highlight_start = "**"
highlight_end = "**"
elif format_type == "telegram":
highlight_start = "<b>"
highlight_end = "</b>"
elif format_type == "slack":
highlight_start = "*"
highlight_end = "*"
else:
# 默认 markdown 格式
highlight_start = "**"
highlight_end = "**"
# 生成排名显示
if min_rank <= rank_threshold:
if min_rank == max_rank:
return f"{highlight_start}[{min_rank}]{highlight_end}"
else:
return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}"
else:
if min_rank == max_rank:
return f"[{min_rank}]"
else:
return f"[{min_rank} - {max_rank}]"
File diff suppressed because it is too large Load Diff