TrendRadar/mcp_server/tools/analytics.py

1997 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
高级数据分析工具
提供热度趋势分析、平台对比、关键词共现、情感分析等高级分析功能。
"""
import re
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from difflib import SequenceMatcher
from ..services.data_service import DataService
from ..utils.validators import (
validate_platforms,
validate_limit,
validate_keyword,
validate_top_n,
validate_date_range
)
from ..utils.errors import MCPError, InvalidParameterError, DataNotFoundError
def calculate_news_weight(news_data: Dict, rank_threshold: int = 5) -> float:
"""
计算新闻权重(用于排序)
基于 main.py 的权重算法实现,综合考虑:
- 排名权重 (60%):新闻在榜单中的排名
- 频次权重 (30%):新闻出现的次数
- 热度权重 (10%):高排名出现的比例
Args:
news_data: 新闻数据字典,包含 ranks 和 count 字段
rank_threshold: 高排名阈值默认5
Returns:
权重分数0-100之间的浮点数
"""
ranks = news_data.get("ranks", [])
if not ranks:
return 0.0
count = news_data.get("count", len(ranks))
# 权重配置(与 config.yaml 保持一致)
RANK_WEIGHT = 0.6
FREQUENCY_WEIGHT = 0.3
HOTNESS_WEIGHT = 0.1
# 1. 排名权重:Σ(11 - min(rank, 10)) / 出现次数
rank_scores = []
for rank in ranks:
score = 11 - min(rank, 10)
rank_scores.append(score)
rank_weight = sum(rank_scores) / len(ranks) if ranks else 0
# 2. 频次权重min(出现次数, 10) × 10
frequency_weight = min(count, 10) * 10
# 3. 热度加成:高排名次数 / 总出现次数 × 100
high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold)
hotness_ratio = high_rank_count / len(ranks) if ranks else 0
hotness_weight = hotness_ratio * 100
# 综合权重
total_weight = (
rank_weight * RANK_WEIGHT
+ frequency_weight * FREQUENCY_WEIGHT
+ hotness_weight * HOTNESS_WEIGHT
)
return total_weight
class AnalyticsTools:
"""高级数据分析工具类"""
def __init__(self, project_root: str = None):
"""
初始化分析工具
Args:
project_root: 项目根目录
"""
self.data_service = DataService(project_root)
def analyze_data_insights_unified(
self,
insight_type: str = "platform_compare",
topic: Optional[str] = None,
date_range: Optional[Dict[str, str]] = None,
min_frequency: int = 3,
top_n: int = 20
) -> Dict:
"""
统一数据洞察分析工具 - 整合多种数据分析模式
Args:
insight_type: 洞察类型,可选值:
- "platform_compare": 平台对比分析(对比不同平台对话题的关注度)
- "platform_activity": 平台活跃度统计(统计各平台发布频率和活跃时间)
- "keyword_cooccur": 关键词共现分析(分析关键词同时出现的模式)
topic: 话题关键词可选platform_compare模式适用
date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
min_frequency: 最小共现频次keyword_cooccur模式默认3
top_n: 返回TOP N结果keyword_cooccur模式默认20
Returns:
数据洞察分析结果字典
Examples:
- analyze_data_insights_unified(insight_type="platform_compare", topic="人工智能")
- analyze_data_insights_unified(insight_type="platform_activity", date_range={...})
- analyze_data_insights_unified(insight_type="keyword_cooccur", min_frequency=5)
"""
try:
# 参数验证
if insight_type not in ["platform_compare", "platform_activity", "keyword_cooccur"]:
raise InvalidParameterError(
f"无效的洞察类型: {insight_type}",
suggestion="支持的类型: platform_compare, platform_activity, keyword_cooccur"
)
# 根据洞察类型调用相应方法
if insight_type == "platform_compare":
return self.compare_platforms(
topic=topic,
date_range=date_range
)
elif insight_type == "platform_activity":
return self.get_platform_activity_stats(
date_range=date_range
)
else: # keyword_cooccur
return self.analyze_keyword_cooccurrence(
min_frequency=min_frequency,
top_n=top_n
)
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def analyze_topic_trend_unified(
self,
topic: str,
analysis_type: str = "trend",
date_range: Optional[Dict[str, str]] = None,
granularity: str = "day",
threshold: float = 3.0,
time_window: int = 24,
lookahead_hours: int = 6,
confidence_threshold: float = 0.7
) -> Dict:
"""
统一话题趋势分析工具 - 整合多种趋势分析模式
Args:
topic: 话题关键词(必需)
analysis_type: 分析类型,可选值:
- "trend": 热度趋势分析(追踪话题的热度变化)
- "lifecycle": 生命周期分析(从出现到消失的完整周期)
- "viral": 异常热度检测(识别突然爆火的话题)
- "predict": 话题预测(预测未来可能的热点)
date_range: 日期范围trend和lifecycle模式可选
- **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- **默认**: 不指定时默认分析最近7天
granularity: 时间粒度trend模式默认"day"hour/day
threshold: 热度突增倍数阈值viral模式默认3.0
time_window: 检测时间窗口小时数viral模式默认24
lookahead_hours: 预测未来小时数predict模式默认6
confidence_threshold: 置信度阈值predict模式默认0.7
Returns:
趋势分析结果字典
Examples (假设今天是 2025-11-17):
- 用户:"分析AI最近7天的趋势" → analyze_topic_trend_unified(topic="人工智能", analysis_type="trend", date_range={"start": "2025-11-11", "end": "2025-11-17"})
- 用户:"看看特斯拉本月的热度" → analyze_topic_trend_unified(topic="特斯拉", analysis_type="lifecycle", date_range={"start": "2025-11-01", "end": "2025-11-17"})
- analyze_topic_trend_unified(topic="比特币", analysis_type="viral", threshold=3.0)
- analyze_topic_trend_unified(topic="ChatGPT", analysis_type="predict", lookahead_hours=6)
"""
try:
# 参数验证
topic = validate_keyword(topic)
if analysis_type not in ["trend", "lifecycle", "viral", "predict"]:
raise InvalidParameterError(
f"无效的分析类型: {analysis_type}",
suggestion="支持的类型: trend, lifecycle, viral, predict"
)
# 根据分析类型调用相应方法
if analysis_type == "trend":
return self.get_topic_trend_analysis(
topic=topic,
date_range=date_range,
granularity=granularity
)
elif analysis_type == "lifecycle":
return self.analyze_topic_lifecycle(
topic=topic,
date_range=date_range
)
elif analysis_type == "viral":
# viral模式不需要topic参数使用通用检测
return self.detect_viral_topics(
threshold=threshold,
time_window=time_window
)
else: # predict
# predict模式不需要topic参数使用通用预测
return self.predict_trending_topics(
lookahead_hours=lookahead_hours,
confidence_threshold=confidence_threshold
)
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def get_topic_trend_analysis(
self,
topic: str,
date_range: Optional[Dict[str, str]] = None,
granularity: str = "day"
) -> Dict:
"""
热度趋势分析 - 追踪特定话题的热度变化趋势
Args:
topic: 话题关键词
date_range: 日期范围(可选)
- **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- **默认**: 不指定时默认分析最近7天
granularity: 时间粒度,仅支持 day
Returns:
趋势分析结果字典
Examples:
用户询问示例:
- "帮我分析一下'人工智能'这个话题最近一周的热度趋势"
- "查看'比特币'过去一周的热度变化"
- "看看'iPhone'最近7天的趋势如何"
- "分析'特斯拉'最近一个月的热度趋势"
- "查看'ChatGPT'2024年12月的趋势变化"
代码调用示例:
>>> tools = AnalyticsTools()
>>> # 分析7天趋势假设今天是 2025-11-17
>>> result = tools.get_topic_trend_analysis(
... topic="人工智能",
... date_range={"start": "2025-11-11", "end": "2025-11-17"},
... granularity="day"
... )
>>> # 分析历史月份趋势
>>> result = tools.get_topic_trend_analysis(
... topic="特斯拉",
... date_range={"start": "2024-12-01", "end": "2024-12-31"},
... granularity="day"
... )
>>> print(result['trend_data'])
"""
try:
# 验证参数
topic = validate_keyword(topic)
# 验证粒度参数只支持day
if granularity != "day":
from ..utils.errors import InvalidParameterError
raise InvalidParameterError(
f"不支持的粒度参数: {granularity}",
suggestion="当前仅支持 'day' 粒度,因为底层数据按天聚合"
)
# 处理日期范围不指定时默认最近7天
if date_range:
from ..utils.validators import validate_date_range
date_range_tuple = validate_date_range(date_range)
start_date, end_date = date_range_tuple
else:
# 默认最近7天
end_date = datetime.now()
start_date = end_date - timedelta(days=6)
# 收集趋势数据
trend_data = []
current_date = start_date
while current_date <= end_date:
try:
all_titles, _, _ = self.data_service.parser.read_all_titles_for_date(
date=current_date
)
# 统计该时间点的话题出现次数
count = 0
matched_titles = []
for _, titles in all_titles.items():
for title in titles.keys():
if topic.lower() in title.lower():
count += 1
matched_titles.append(title)
trend_data.append({
"date": current_date.strftime("%Y-%m-%d"),
"count": count,
"sample_titles": matched_titles[:3] # 只保留前3个样本
})
except DataNotFoundError:
trend_data.append({
"date": current_date.strftime("%Y-%m-%d"),
"count": 0,
"sample_titles": []
})
# 按天增加时间
current_date += timedelta(days=1)
# 计算趋势指标
counts = [item["count"] for item in trend_data]
total_days = (end_date - start_date).days + 1
if len(counts) >= 2:
# 计算涨跌幅度
first_non_zero = next((c for c in counts if c > 0), 0)
last_count = counts[-1]
if first_non_zero > 0:
change_rate = ((last_count - first_non_zero) / first_non_zero) * 100
else:
change_rate = 0
# 找到峰值时间
max_count = max(counts)
peak_index = counts.index(max_count)
peak_time = trend_data[peak_index]["date"]
else:
change_rate = 0
peak_time = None
max_count = 0
return {
"success": True,
"topic": topic,
"date_range": {
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d"),
"total_days": total_days
},
"granularity": granularity,
"trend_data": trend_data,
"statistics": {
"total_mentions": sum(counts),
"average_mentions": round(sum(counts) / len(counts), 2) if counts else 0,
"peak_count": max_count,
"peak_time": peak_time,
"change_rate": round(change_rate, 2)
},
"trend_direction": "上升" if change_rate > 10 else "下降" if change_rate < -10 else "稳定"
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def compare_platforms(
self,
topic: Optional[str] = None,
date_range: Optional[Dict[str, str]] = None
) -> Dict:
"""
平台对比分析 - 对比不同平台对同一话题的关注度
Args:
topic: 话题关键词(可选,不指定则对比整体活跃度)
date_range: 日期范围,格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
Returns:
平台对比分析结果
Examples:
用户询问示例:
- "对比一下各个平台对'人工智能'话题的关注度"
- "看看知乎和微博哪个平台更关注科技新闻"
- "分析各平台今天的热点分布"
代码调用示例:
>>> # 对比各平台(假设今天是 2025-11-17
>>> result = tools.compare_platforms(
... topic="人工智能",
... date_range={"start": "2025-11-08", "end": "2025-11-17"}
... )
>>> print(result['platform_stats'])
"""
try:
# 参数验证
if topic:
topic = validate_keyword(topic)
date_range_tuple = validate_date_range(date_range)
# 确定日期范围
if date_range_tuple:
start_date, end_date = date_range_tuple
else:
start_date = end_date = datetime.now()
# 收集各平台数据
platform_stats = defaultdict(lambda: {
"total_news": 0,
"topic_mentions": 0,
"unique_titles": set(),
"top_keywords": Counter()
})
# 遍历日期范围
current_date = start_date
while current_date <= end_date:
try:
all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date(
date=current_date
)
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
for title in titles.keys():
platform_stats[platform_name]["total_news"] += 1
platform_stats[platform_name]["unique_titles"].add(title)
# 如果指定了话题,统计包含话题的新闻
if topic and topic.lower() in title.lower():
platform_stats[platform_name]["topic_mentions"] += 1
# 提取关键词(简单分词)
keywords = self._extract_keywords(title)
platform_stats[platform_name]["top_keywords"].update(keywords)
except DataNotFoundError:
pass
current_date += timedelta(days=1)
# 转换为可序列化的格式
result_stats = {}
for platform, stats in platform_stats.items():
coverage_rate = 0
if stats["total_news"] > 0:
coverage_rate = (stats["topic_mentions"] / stats["total_news"]) * 100
result_stats[platform] = {
"total_news": stats["total_news"],
"topic_mentions": stats["topic_mentions"],
"unique_titles": len(stats["unique_titles"]),
"coverage_rate": round(coverage_rate, 2),
"top_keywords": [
{"keyword": k, "count": v}
for k, v in stats["top_keywords"].most_common(5)
]
}
# 找出各平台独有的热点
unique_topics = self._find_unique_topics(platform_stats)
return {
"success": True,
"topic": topic,
"date_range": {
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d")
},
"platform_stats": result_stats,
"unique_topics": unique_topics,
"total_platforms": len(result_stats)
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def analyze_keyword_cooccurrence(
self,
min_frequency: int = 3,
top_n: int = 20
) -> Dict:
"""
关键词共现分析 - 分析哪些关键词经常同时出现
Args:
min_frequency: 最小共现频次
top_n: 返回TOP N关键词对
Returns:
关键词共现分析结果
Examples:
用户询问示例:
- "分析一下哪些关键词经常一起出现"
- "看看'人工智能'经常和哪些词一起出现"
- "找出今天新闻中的关键词关联"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.analyze_keyword_cooccurrence(
... min_frequency=5,
... top_n=15
... )
>>> print(result['cooccurrence_pairs'])
"""
try:
# 参数验证
min_frequency = validate_limit(min_frequency, default=3, max_limit=100)
top_n = validate_top_n(top_n, default=20)
# 读取今天的数据
all_titles, _, _ = self.data_service.parser.read_all_titles_for_date()
# 关键词共现统计
cooccurrence = Counter()
keyword_titles = defaultdict(list)
for platform_id, titles in all_titles.items():
for title in titles.keys():
# 提取关键词
keywords = self._extract_keywords(title)
# 记录每个关键词出现的标题
for kw in keywords:
keyword_titles[kw].append(title)
# 计算两两共现
if len(keywords) >= 2:
for i, kw1 in enumerate(keywords):
for kw2 in keywords[i+1:]:
# 统一排序,避免重复
pair = tuple(sorted([kw1, kw2]))
cooccurrence[pair] += 1
# 过滤低频共现
filtered_pairs = [
(pair, count) for pair, count in cooccurrence.items()
if count >= min_frequency
]
# 排序并取TOP N
top_pairs = sorted(filtered_pairs, key=lambda x: x[1], reverse=True)[:top_n]
# 构建结果
result_pairs = []
for (kw1, kw2), count in top_pairs:
# 找出同时包含两个关键词的标题样本
titles_with_both = [
title for title in keyword_titles[kw1]
if kw2 in self._extract_keywords(title)
]
result_pairs.append({
"keyword1": kw1,
"keyword2": kw2,
"cooccurrence_count": count,
"sample_titles": titles_with_both[:3]
})
return {
"success": True,
"cooccurrence_pairs": result_pairs,
"total_pairs": len(result_pairs),
"min_frequency": min_frequency,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def analyze_sentiment(
self,
topic: Optional[str] = None,
platforms: Optional[List[str]] = None,
date_range: Optional[Dict[str, str]] = None,
limit: int = 50,
sort_by_weight: bool = True,
include_url: bool = False
) -> Dict:
"""
情感倾向分析 - 生成用于 AI 情感分析的结构化提示词
本工具收集新闻数据并生成优化的 AI 提示词,你可以将其发送给 AI 进行深度情感分析。
Args:
topic: 话题关键词(可选),只分析包含该关键词的新闻
platforms: 平台过滤列表(可选),如 ['zhihu', 'weibo']
date_range: 日期范围(可选),格式: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
不指定则默认查询今天的数据
limit: 返回新闻数量限制默认50最大100
sort_by_weight: 是否按权重排序默认True推荐
include_url: 是否包含URL链接默认False节省token
Returns:
包含 AI 提示词和新闻数据的结构化结果
Examples:
用户询问示例:
- "分析一下今天新闻的情感倾向"
- "看看'特斯拉'相关新闻是正面还是负面的"
- "分析各平台对'人工智能'的情感态度"
- "看看'特斯拉'相关新闻是正面还是负面的请选择一周内的前10条新闻来分析"
代码调用示例:
>>> tools = AnalyticsTools()
>>> # 分析今天的特斯拉新闻返回前10条
>>> result = tools.analyze_sentiment(
... topic="特斯拉",
... limit=10
... )
>>> # 分析一周内的特斯拉新闻(假设今天是 2025-11-17
>>> result = tools.analyze_sentiment(
... topic="特斯拉",
... date_range={"start": "2025-11-11", "end": "2025-11-17"},
... limit=10
... )
>>> print(result['ai_prompt']) # 获取生成的提示词
"""
try:
# 参数验证
if topic:
topic = validate_keyword(topic)
platforms = validate_platforms(platforms)
limit = validate_limit(limit, default=50)
# 处理日期范围
if date_range:
date_range_tuple = validate_date_range(date_range)
start_date, end_date = date_range_tuple
else:
# 默认今天
start_date = end_date = datetime.now()
# 收集新闻数据(支持多天)
all_news_items = []
current_date = start_date
while current_date <= end_date:
try:
all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date(
date=current_date,
platform_ids=platforms
)
# 收集该日期的新闻
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
for title, info in titles.items():
# 如果指定了话题,只收集包含话题的标题
if topic and topic.lower() not in title.lower():
continue
news_item = {
"platform": platform_name,
"title": title,
"ranks": info.get("ranks", []),
"count": len(info.get("ranks", [])),
"date": current_date.strftime("%Y-%m-%d")
}
# 条件性添加 URL 字段
if include_url:
news_item["url"] = info.get("url", "")
news_item["mobileUrl"] = info.get("mobileUrl", "")
all_news_items.append(news_item)
except DataNotFoundError:
# 该日期没有数据,继续下一天
pass
# 下一天
current_date += timedelta(days=1)
if not all_news_items:
time_desc = "今天" if start_date == end_date else f"{start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}"
raise DataNotFoundError(
f"未找到相关新闻({time_desc}",
suggestion="请尝试其他话题、日期范围或平台"
)
# 去重(同一标题只保留一次)
unique_news = {}
for item in all_news_items:
key = f"{item['platform']}::{item['title']}"
if key not in unique_news:
unique_news[key] = item
else:
# 合并 ranks如果同一新闻在多天出现
existing = unique_news[key]
existing["ranks"].extend(item["ranks"])
existing["count"] = len(existing["ranks"])
deduplicated_news = list(unique_news.values())
# 按权重排序(如果启用)
if sort_by_weight:
deduplicated_news.sort(
key=lambda x: calculate_news_weight(x),
reverse=True
)
# 限制返回数量
selected_news = deduplicated_news[:limit]
# 生成 AI 提示词
ai_prompt = self._create_sentiment_analysis_prompt(
news_data=selected_news,
topic=topic
)
# 构建时间范围描述
if start_date == end_date:
time_range_desc = start_date.strftime("%Y-%m-%d")
else:
time_range_desc = f"{start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}"
result = {
"success": True,
"method": "ai_prompt_generation",
"summary": {
"total_found": len(deduplicated_news),
"returned_count": len(selected_news),
"requested_limit": limit,
"duplicates_removed": len(all_news_items) - len(deduplicated_news),
"topic": topic,
"time_range": time_range_desc,
"platforms": list(set(item["platform"] for item in selected_news)),
"sorted_by_weight": sort_by_weight
},
"ai_prompt": ai_prompt,
"news_sample": selected_news,
"usage_note": "请将 ai_prompt 字段的内容发送给 AI 进行情感分析"
}
# 如果返回数量少于请求数量,增加提示
if len(selected_news) < limit and len(deduplicated_news) >= limit:
result["note"] = "返回数量少于请求数量是因为去重逻辑(同一标题在不同平台只保留一次)"
elif len(deduplicated_news) < limit:
result["note"] = f"在指定时间范围内仅找到 {len(deduplicated_news)} 条匹配的新闻"
return result
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def _create_sentiment_analysis_prompt(
self,
news_data: List[Dict],
topic: Optional[str]
) -> str:
"""
创建情感分析的 AI 提示词
Args:
news_data: 新闻数据列表(已排序和限制数量)
topic: 话题关键词
Returns:
格式化的 AI 提示词
"""
# 按平台分组
platform_news = defaultdict(list)
for item in news_data:
platform_news[item["platform"]].append({
"title": item["title"],
"date": item.get("date", "")
})
# 构建提示词
prompt_parts = []
# 1. 任务说明
if topic:
prompt_parts.append(f"请分析以下关于「{topic}」的新闻标题的情感倾向。")
else:
prompt_parts.append("请分析以下新闻标题的情感倾向。")
prompt_parts.append("")
prompt_parts.append("分析要求:")
prompt_parts.append("1. 识别每条新闻的情感倾向(正面/负面/中性)")
prompt_parts.append("2. 统计各情感类别的数量和百分比")
prompt_parts.append("3. 分析不同平台的情感差异")
prompt_parts.append("4. 总结整体情感趋势")
prompt_parts.append("5. 列举典型的正面和负面新闻样本")
prompt_parts.append("")
# 2. 数据概览
prompt_parts.append(f"数据概览:")
prompt_parts.append(f"- 总新闻数:{len(news_data)}")
prompt_parts.append(f"- 覆盖平台:{len(platform_news)}")
# 时间范围
dates = set(item.get("date", "") for item in news_data if item.get("date"))
if dates:
date_list = sorted(dates)
if len(date_list) == 1:
prompt_parts.append(f"- 时间范围:{date_list[0]}")
else:
prompt_parts.append(f"- 时间范围:{date_list[0]}{date_list[-1]}")
prompt_parts.append("")
# 3. 按平台展示新闻
prompt_parts.append("新闻列表(按平台分类,已按重要性排序):")
prompt_parts.append("")
for platform, items in sorted(platform_news.items()):
prompt_parts.append(f"{platform}】({len(items)} 条)")
for i, item in enumerate(items, 1):
title = item["title"]
date_str = f" [{item['date']}]" if item.get("date") else ""
prompt_parts.append(f"{i}. {title}{date_str}")
prompt_parts.append("")
# 4. 输出格式说明
prompt_parts.append("请按以下格式输出分析结果:")
prompt_parts.append("")
prompt_parts.append("## 情感分布统计")
prompt_parts.append("- 正面XX条 (XX%)")
prompt_parts.append("- 负面XX条 (XX%)")
prompt_parts.append("- 中性XX条 (XX%)")
prompt_parts.append("")
prompt_parts.append("## 平台情感对比")
prompt_parts.append("[各平台的情感倾向差异]")
prompt_parts.append("")
prompt_parts.append("## 整体情感趋势")
prompt_parts.append("[总体分析和关键发现]")
prompt_parts.append("")
prompt_parts.append("## 典型样本")
prompt_parts.append("正面新闻样本:")
prompt_parts.append("[列举3-5条]")
prompt_parts.append("")
prompt_parts.append("负面新闻样本:")
prompt_parts.append("[列举3-5条]")
return "\n".join(prompt_parts)
def find_similar_news(
self,
reference_title: str,
threshold: float = 0.6,
limit: int = 50,
include_url: bool = False
) -> Dict:
"""
相似新闻查找 - 基于标题相似度查找相关新闻
Args:
reference_title: 参考标题
threshold: 相似度阈值0-1之间
limit: 返回条数限制默认50
include_url: 是否包含URL链接默认False节省token
Returns:
相似新闻列表
Examples:
用户询问示例:
- "找出和'特斯拉降价'相似的新闻"
- "查找关于iPhone发布的类似报道"
- "看看有没有和这条新闻相似的报道"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.find_similar_news(
... reference_title="特斯拉宣布降价",
... threshold=0.6,
... limit=10
... )
>>> print(result['similar_news'])
"""
try:
# 参数验证
reference_title = validate_keyword(reference_title)
if not 0 <= threshold <= 1:
raise InvalidParameterError(
"threshold 必须在 0 到 1 之间",
suggestion="推荐值0.5-0.8"
)
limit = validate_limit(limit, default=50)
# 读取数据
all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date()
# 计算相似度
similar_items = []
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
for title, info in titles.items():
if title == reference_title:
continue
# 计算相似度
similarity = self._calculate_similarity(reference_title, title)
if similarity >= threshold:
news_item = {
"title": title,
"platform": platform_id,
"platform_name": platform_name,
"similarity": round(similarity, 3),
"rank": info["ranks"][0] if info["ranks"] else 0
}
# 条件性添加 URL 字段
if include_url:
news_item["url"] = info.get("url", "")
similar_items.append(news_item)
# 按相似度排序
similar_items.sort(key=lambda x: x["similarity"], reverse=True)
# 限制数量
result_items = similar_items[:limit]
if not result_items:
raise DataNotFoundError(
f"未找到相似度超过 {threshold} 的新闻",
suggestion="请降低相似度阈值或尝试其他标题"
)
result = {
"success": True,
"summary": {
"total_found": len(similar_items),
"returned_count": len(result_items),
"requested_limit": limit,
"threshold": threshold,
"reference_title": reference_title
},
"similar_news": result_items
}
if len(similar_items) < limit:
result["note"] = f"相似度阈值 {threshold} 下仅找到 {len(similar_items)} 条相似新闻"
return result
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def search_by_entity(
self,
entity: str,
entity_type: Optional[str] = None,
limit: int = 50,
sort_by_weight: bool = True
) -> Dict:
"""
实体识别搜索 - 搜索包含特定人物/地点/机构的新闻
Args:
entity: 实体名称
entity_type: 实体类型person/location/organization可选
limit: 返回条数限制默认50最大200
sort_by_weight: 是否按权重排序默认True
Returns:
实体相关新闻列表
Examples:
用户询问示例:
- "搜索马斯克相关的新闻"
- "查找关于特斯拉公司的报道返回前20条"
- "看看北京有什么新闻"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.search_by_entity(
... entity="马斯克",
... entity_type="person",
... limit=20
... )
>>> print(result['related_news'])
"""
try:
# 参数验证
entity = validate_keyword(entity)
limit = validate_limit(limit, default=50)
if entity_type and entity_type not in ["person", "location", "organization"]:
raise InvalidParameterError(
f"无效的实体类型: {entity_type}",
suggestion="支持的类型: person, location, organization"
)
# 读取数据
all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date()
# 搜索包含实体的新闻
related_news = []
entity_context = Counter() # 统计实体周边的词
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
for title, info in titles.items():
if entity in title:
url = info.get("url", "")
mobile_url = info.get("mobileUrl", "")
ranks = info.get("ranks", [])
count = len(ranks)
related_news.append({
"title": title,
"platform": platform_id,
"platform_name": platform_name,
"url": url,
"mobileUrl": mobile_url,
"ranks": ranks,
"count": count,
"rank": ranks[0] if ranks else 999
})
# 提取实体周边的关键词
keywords = self._extract_keywords(title)
entity_context.update(keywords)
if not related_news:
raise DataNotFoundError(
f"未找到包含实体 '{entity}' 的新闻",
suggestion="请尝试其他实体名称"
)
# 移除实体本身
if entity in entity_context:
del entity_context[entity]
# 按权重排序(如果启用)
if sort_by_weight:
related_news.sort(
key=lambda x: calculate_news_weight(x),
reverse=True
)
else:
# 按排名排序
related_news.sort(key=lambda x: x["rank"])
# 限制返回数量
result_news = related_news[:limit]
return {
"success": True,
"entity": entity,
"entity_type": entity_type or "auto",
"related_news": result_news,
"total_found": len(related_news),
"returned_count": len(result_news),
"sorted_by_weight": sort_by_weight,
"related_keywords": [
{"keyword": k, "count": v}
for k, v in entity_context.most_common(10)
]
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def generate_summary_report(
self,
report_type: str = "daily",
date_range: Optional[Dict[str, str]] = None
) -> Dict:
"""
每日/每周摘要生成器 - 自动生成热点摘要报告
Args:
report_type: 报告类型daily/weekly
date_range: 自定义日期范围(可选)
Returns:
Markdown格式的摘要报告
Examples:
用户询问示例:
- "生成今天的新闻摘要报告"
- "给我一份本周的热点总结"
- "生成过去7天的新闻分析报告"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.generate_summary_report(
... report_type="daily"
... )
>>> print(result['markdown_report'])
"""
try:
# 参数验证
if report_type not in ["daily", "weekly"]:
raise InvalidParameterError(
f"无效的报告类型: {report_type}",
suggestion="支持的类型: daily, weekly"
)
# 确定日期范围
if date_range:
date_range_tuple = validate_date_range(date_range)
start_date, end_date = date_range_tuple
else:
if report_type == "daily":
start_date = end_date = datetime.now()
else: # weekly
end_date = datetime.now()
start_date = end_date - timedelta(days=6)
# 收集数据
all_keywords = Counter()
all_platforms_news = defaultdict(int)
all_titles_list = []
current_date = start_date
while current_date <= end_date:
try:
all_titles, id_to_name, _ = self.data_service.parser.read_all_titles_for_date(
date=current_date
)
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
all_platforms_news[platform_name] += len(titles)
for title in titles.keys():
all_titles_list.append({
"title": title,
"platform": platform_name,
"date": current_date.strftime("%Y-%m-%d")
})
# 提取关键词
keywords = self._extract_keywords(title)
all_keywords.update(keywords)
except DataNotFoundError:
pass
current_date += timedelta(days=1)
# 生成报告
report_title = f"{'每日' if report_type == 'daily' else '每周'}新闻热点摘要"
date_str = f"{start_date.strftime('%Y-%m-%d')}" if report_type == "daily" else f"{start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}"
# 构建Markdown报告
markdown = f"""# {report_title}
**报告日期**: {date_str}
**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
## 📊 数据概览
- **总新闻数**: {len(all_titles_list)}
- **覆盖平台**: {len(all_platforms_news)}
- **热门关键词数**: {len(all_keywords)}
## 🔥 TOP 10 热门话题
"""
# 添加TOP 10关键词
for i, (keyword, count) in enumerate(all_keywords.most_common(10), 1):
markdown += f"{i}. **{keyword}** - 出现 {count}\n"
# 平台分析
markdown += "\n## 📱 平台活跃度\n\n"
sorted_platforms = sorted(all_platforms_news.items(), key=lambda x: x[1], reverse=True)
for platform, count in sorted_platforms:
markdown += f"- **{platform}**: {count} 条新闻\n"
# 趋势变化(如果是周报)
if report_type == "weekly":
markdown += "\n## 📈 趋势分析\n\n"
markdown += "本周热度持续的话题(样本数据):\n\n"
# 简单的趋势分析
top_keywords = [kw for kw, _ in all_keywords.most_common(5)]
for keyword in top_keywords:
markdown += f"- **{keyword}**: 持续热门\n"
# 添加样本新闻(按权重选择,确保确定性)
markdown += "\n## 📰 精选新闻样本\n\n"
# 确定性选取按标题的权重排序取前5条
# 这样相同输入总是返回相同结果
if all_titles_list:
# 计算每条新闻的权重分数(基于关键词出现次数)
news_with_scores = []
for news in all_titles_list:
# 简单权重统计包含TOP关键词的次数
score = 0
title_lower = news['title'].lower()
for keyword, count in all_keywords.most_common(10):
if keyword.lower() in title_lower:
score += count
news_with_scores.append((news, score))
# 按权重降序排序,权重相同则按标题字母顺序(确保确定性)
news_with_scores.sort(key=lambda x: (-x[1], x[0]['title']))
# 取前5条
sample_news = [item[0] for item in news_with_scores[:5]]
for news in sample_news:
markdown += f"- [{news['platform']}] {news['title']}\n"
markdown += "\n---\n\n*本报告由 TrendRadar MCP 自动生成*\n"
return {
"success": True,
"report_type": report_type,
"date_range": {
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d")
},
"markdown_report": markdown,
"statistics": {
"total_news": len(all_titles_list),
"platforms_count": len(all_platforms_news),
"keywords_count": len(all_keywords),
"top_keyword": all_keywords.most_common(1)[0] if all_keywords else None
}
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def get_platform_activity_stats(
self,
date_range: Optional[Dict[str, str]] = None
) -> Dict:
"""
平台活跃度统计 - 统计各平台的发布频率和活跃时间段
Args:
date_range: 日期范围(可选)
Returns:
平台活跃度统计结果
Examples:
用户询问示例:
- "统计各平台今天的活跃度"
- "看看哪个平台更新最频繁"
- "分析各平台的发布时间规律"
代码调用示例:
>>> # 查看各平台活跃度(假设今天是 2025-11-17
>>> result = tools.get_platform_activity_stats(
... date_range={"start": "2025-11-08", "end": "2025-11-17"}
... )
>>> print(result['platform_activity'])
"""
try:
# 参数验证
date_range_tuple = validate_date_range(date_range)
# 确定日期范围
if date_range_tuple:
start_date, end_date = date_range_tuple
else:
start_date = end_date = datetime.now()
# 统计各平台活跃度
platform_activity = defaultdict(lambda: {
"total_updates": 0,
"days_active": set(),
"news_count": 0,
"hourly_distribution": Counter()
})
# 遍历日期范围
current_date = start_date
while current_date <= end_date:
try:
all_titles, id_to_name, timestamps = self.data_service.parser.read_all_titles_for_date(
date=current_date
)
for platform_id, titles in all_titles.items():
platform_name = id_to_name.get(platform_id, platform_id)
platform_activity[platform_name]["news_count"] += len(titles)
platform_activity[platform_name]["days_active"].add(current_date.strftime("%Y-%m-%d"))
# 统计更新次数(基于文件数量)
platform_activity[platform_name]["total_updates"] += len(timestamps)
# 统计时间分布(基于文件名中的时间)
for filename in timestamps.keys():
# 解析文件名中的小时格式HHMM.txt
match = re.match(r'(\d{2})(\d{2})\.txt', filename)
if match:
hour = int(match.group(1))
platform_activity[platform_name]["hourly_distribution"][hour] += 1
except DataNotFoundError:
pass
current_date += timedelta(days=1)
# 转换为可序列化的格式
result_activity = {}
for platform, stats in platform_activity.items():
days_count = len(stats["days_active"])
avg_news_per_day = stats["news_count"] / days_count if days_count > 0 else 0
# 找出最活跃的时间段
most_active_hours = stats["hourly_distribution"].most_common(3)
result_activity[platform] = {
"total_updates": stats["total_updates"],
"news_count": stats["news_count"],
"days_active": days_count,
"avg_news_per_day": round(avg_news_per_day, 2),
"most_active_hours": [
{"hour": f"{hour:02d}:00", "count": count}
for hour, count in most_active_hours
],
"activity_score": round(stats["news_count"] / max(days_count, 1), 2)
}
# 按活跃度排序
sorted_platforms = sorted(
result_activity.items(),
key=lambda x: x[1]["activity_score"],
reverse=True
)
return {
"success": True,
"date_range": {
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d")
},
"platform_activity": dict(sorted_platforms),
"most_active_platform": sorted_platforms[0][0] if sorted_platforms else None,
"total_platforms": len(result_activity)
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def analyze_topic_lifecycle(
self,
topic: str,
date_range: Optional[Dict[str, str]] = None
) -> Dict:
"""
话题生命周期分析 - 追踪话题从出现到消失的完整周期
Args:
topic: 话题关键词
date_range: 日期范围(可选)
- **格式**: {"start": "YYYY-MM-DD", "end": "YYYY-MM-DD"}
- **默认**: 不指定时默认分析最近7天
Returns:
话题生命周期分析结果
Examples:
用户询问示例:
- "分析'人工智能'这个话题的生命周期"
- "看看'iPhone'话题是昙花一现还是持续热点"
- "追踪'比特币'话题的热度变化"
代码调用示例:
>>> # 分析话题生命周期(假设今天是 2025-11-17
>>> result = tools.analyze_topic_lifecycle(
... topic="人工智能",
... date_range={"start": "2025-10-19", "end": "2025-11-17"}
... )
>>> print(result['lifecycle_stage'])
"""
try:
# 参数验证
topic = validate_keyword(topic)
# 处理日期范围不指定时默认最近7天
if date_range:
from ..utils.validators import validate_date_range
date_range_tuple = validate_date_range(date_range)
start_date, end_date = date_range_tuple
else:
# 默认最近7天
end_date = datetime.now()
start_date = end_date - timedelta(days=6)
# 收集话题历史数据
lifecycle_data = []
current_date = start_date
while current_date <= end_date:
try:
all_titles, _, _ = self.data_service.parser.read_all_titles_for_date(
date=current_date
)
# 统计该日的话题出现次数
count = 0
for _, titles in all_titles.items():
for title in titles.keys():
if topic.lower() in title.lower():
count += 1
lifecycle_data.append({
"date": current_date.strftime("%Y-%m-%d"),
"count": count
})
except DataNotFoundError:
lifecycle_data.append({
"date": current_date.strftime("%Y-%m-%d"),
"count": 0
})
current_date += timedelta(days=1)
# 计算分析天数
total_days = (end_date - start_date).days + 1
# 分析生命周期阶段
counts = [item["count"] for item in lifecycle_data]
if not any(counts):
time_desc = f"{start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}"
raise DataNotFoundError(
f"{time_desc} 内未找到话题 '{topic}'",
suggestion="请尝试其他话题或扩大时间范围"
)
# 找到首次出现和最后出现
first_appearance = next((item["date"] for item in lifecycle_data if item["count"] > 0), None)
last_appearance = next((item["date"] for item in reversed(lifecycle_data) if item["count"] > 0), None)
# 计算峰值
max_count = max(counts)
peak_index = counts.index(max_count)
peak_date = lifecycle_data[peak_index]["date"]
# 计算平均值和标准差(简单实现)
non_zero_counts = [c for c in counts if c > 0]
avg_count = sum(non_zero_counts) / len(non_zero_counts) if non_zero_counts else 0
# 判断生命周期阶段
recent_counts = counts[-3:] # 最近3天
early_counts = counts[:3] # 前3天
if sum(recent_counts) > sum(early_counts):
lifecycle_stage = "上升期"
elif sum(recent_counts) < sum(early_counts) * 0.5:
lifecycle_stage = "衰退期"
elif max_count in recent_counts:
lifecycle_stage = "爆发期"
else:
lifecycle_stage = "稳定期"
# 分类:昙花一现 vs 持续热点
active_days = sum(1 for c in counts if c > 0)
if active_days <= 2 and max_count > avg_count * 2:
topic_type = "昙花一现"
elif active_days >= total_days * 0.6:
topic_type = "持续热点"
else:
topic_type = "周期性热点"
return {
"success": True,
"topic": topic,
"date_range": {
"start": start_date.strftime("%Y-%m-%d"),
"end": end_date.strftime("%Y-%m-%d"),
"total_days": total_days
},
"lifecycle_data": lifecycle_data,
"analysis": {
"first_appearance": first_appearance,
"last_appearance": last_appearance,
"peak_date": peak_date,
"peak_count": max_count,
"active_days": active_days,
"avg_daily_mentions": round(avg_count, 2),
"lifecycle_stage": lifecycle_stage,
"topic_type": topic_type
}
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def detect_viral_topics(
self,
threshold: float = 3.0,
time_window: int = 24
) -> Dict:
"""
异常热度检测 - 自动识别突然爆火的话题
Args:
threshold: 热度突增倍数阈值
time_window: 检测时间窗口(小时)
Returns:
爆火话题列表
Examples:
用户询问示例:
- "检测今天有哪些突然爆火的话题"
- "看看有没有热度异常的新闻"
- "预警可能的重大事件"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.detect_viral_topics(
... threshold=3.0,
... time_window=24
... )
>>> print(result['viral_topics'])
"""
try:
# 参数验证
if threshold < 1.0:
raise InvalidParameterError(
"threshold 必须大于等于 1.0",
suggestion="推荐值2.0-5.0"
)
time_window = validate_limit(time_window, default=24, max_limit=72)
# 读取当前和之前的数据
current_all_titles, _, _ = self.data_service.parser.read_all_titles_for_date()
# 读取昨天的数据作为基准
yesterday = datetime.now() - timedelta(days=1)
try:
previous_all_titles, _, _ = self.data_service.parser.read_all_titles_for_date(
date=yesterday
)
except DataNotFoundError:
previous_all_titles = {}
# 统计当前的关键词频率
current_keywords = Counter()
current_keyword_titles = defaultdict(list)
for _, titles in current_all_titles.items():
for title in titles.keys():
keywords = self._extract_keywords(title)
current_keywords.update(keywords)
for kw in keywords:
current_keyword_titles[kw].append(title)
# 统计之前的关键词频率
previous_keywords = Counter()
for _, titles in previous_all_titles.items():
for title in titles.keys():
keywords = self._extract_keywords(title)
previous_keywords.update(keywords)
# 检测异常热度
viral_topics = []
for keyword, current_count in current_keywords.items():
previous_count = previous_keywords.get(keyword, 0)
# 计算增长倍数
if previous_count == 0:
# 新出现的话题
if current_count >= 5: # 至少出现5次才认为是爆火
growth_rate = float('inf')
is_viral = True
else:
continue
else:
growth_rate = current_count / previous_count
is_viral = growth_rate >= threshold
if is_viral:
viral_topics.append({
"keyword": keyword,
"current_count": current_count,
"previous_count": previous_count,
"growth_rate": round(growth_rate, 2) if growth_rate != float('inf') else "新话题",
"sample_titles": current_keyword_titles[keyword][:3],
"alert_level": "" if growth_rate > threshold * 2 else ""
})
# 按增长率排序
viral_topics.sort(
key=lambda x: x["current_count"] if x["growth_rate"] == "新话题" else x["growth_rate"],
reverse=True
)
if not viral_topics:
return {
"success": True,
"viral_topics": [],
"total_detected": 0,
"message": f"未检测到热度增长超过 {threshold} 倍的话题"
}
return {
"success": True,
"viral_topics": viral_topics,
"total_detected": len(viral_topics),
"threshold": threshold,
"time_window": time_window,
"detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
def predict_trending_topics(
self,
lookahead_hours: int = 6,
confidence_threshold: float = 0.7
) -> Dict:
"""
话题预测 - 基于历史数据预测未来可能的热点
Args:
lookahead_hours: 预测未来多少小时
confidence_threshold: 置信度阈值
Returns:
预测的潜力话题列表
Examples:
用户询问示例:
- "预测接下来6小时可能的热点话题"
- "有哪些话题可能会火起来"
- "早期发现潜力话题"
代码调用示例:
>>> tools = AnalyticsTools()
>>> result = tools.predict_trending_topics(
... lookahead_hours=6,
... confidence_threshold=0.7
... )
>>> print(result['predicted_topics'])
"""
try:
# 参数验证
lookahead_hours = validate_limit(lookahead_hours, default=6, max_limit=48)
if not 0 <= confidence_threshold <= 1:
raise InvalidParameterError(
"confidence_threshold 必须在 0 到 1 之间",
suggestion="推荐值0.6-0.8"
)
# 收集最近3天的数据用于预测
keyword_trends = defaultdict(list)
for days_ago in range(3, 0, -1):
date = datetime.now() - timedelta(days=days_ago)
try:
all_titles, _, _ = self.data_service.parser.read_all_titles_for_date(
date=date
)
# 统计关键词
keywords_count = Counter()
for _, titles in all_titles.items():
for title in titles.keys():
keywords = self._extract_keywords(title)
keywords_count.update(keywords)
# 记录每个关键词的历史数据
for keyword, count in keywords_count.items():
keyword_trends[keyword].append(count)
except DataNotFoundError:
pass
# 添加今天的数据
try:
all_titles, _, _ = self.data_service.parser.read_all_titles_for_date()
keywords_count = Counter()
keyword_titles = defaultdict(list)
for _, titles in all_titles.items():
for title in titles.keys():
keywords = self._extract_keywords(title)
keywords_count.update(keywords)
for kw in keywords:
keyword_titles[kw].append(title)
for keyword, count in keywords_count.items():
keyword_trends[keyword].append(count)
except DataNotFoundError:
raise DataNotFoundError(
"未找到今天的数据",
suggestion="请等待爬虫任务完成"
)
# 预测潜力话题
predicted_topics = []
for keyword, trend_data in keyword_trends.items():
if len(trend_data) < 2:
continue
# 简单的线性趋势预测
# 计算增长率
recent_value = trend_data[-1]
previous_value = trend_data[-2] if len(trend_data) >= 2 else 0
if previous_value == 0:
if recent_value >= 3:
growth_rate = 1.0
else:
continue
else:
growth_rate = (recent_value - previous_value) / previous_value
# 判断是否是上升趋势
if growth_rate > 0.3: # 增长超过30%
# 计算置信度(基于趋势的稳定性)
if len(trend_data) >= 3:
# 检查是否连续增长
is_consistent = all(
trend_data[i] <= trend_data[i+1]
for i in range(len(trend_data)-1)
)
confidence = 0.9 if is_consistent else 0.7
else:
confidence = 0.6
if confidence >= confidence_threshold:
predicted_topics.append({
"keyword": keyword,
"current_count": recent_value,
"growth_rate": round(growth_rate * 100, 2),
"confidence": round(confidence, 2),
"trend_data": trend_data,
"prediction": "上升趋势,可能成为热点",
"sample_titles": keyword_titles.get(keyword, [])[:3]
})
# 按置信度和增长率排序
predicted_topics.sort(
key=lambda x: (x["confidence"], x["growth_rate"]),
reverse=True
)
return {
"success": True,
"predicted_topics": predicted_topics[:20], # 返回TOP 20
"total_predicted": len(predicted_topics),
"lookahead_hours": lookahead_hours,
"confidence_threshold": confidence_threshold,
"prediction_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"note": "预测基于历史趋势,实际结果可能有偏差"
}
except MCPError as e:
return {
"success": False,
"error": e.to_dict()
}
except Exception as e:
return {
"success": False,
"error": {
"code": "INTERNAL_ERROR",
"message": str(e)
}
}
# ==================== 辅助方法 ====================
def _extract_keywords(self, title: str, min_length: int = 2) -> List[str]:
"""
从标题中提取关键词(简单实现)
Args:
title: 标题文本
min_length: 最小关键词长度
Returns:
关键词列表
"""
# 移除URL和特殊字符
title = re.sub(r'http[s]?://\S+', '', title)
title = re.sub(r'[^\w\s]', ' ', title)
# 简单分词(按空格和常见分隔符)
words = re.split(r'[\s、]+', title)
# 过滤停用词和短词
stopwords = {'', '', '', '', '', '', '', '', '', '', '', '', '一个', '', '', '', '', '', '', '', '', '', '', '没有', '', '', '自己', ''}
keywords = [
word.strip() for word in words
if word.strip() and len(word.strip()) >= min_length and word.strip() not in stopwords
]
return keywords
def _calculate_similarity(self, text1: str, text2: str) -> float:
"""
计算两个文本的相似度
Args:
text1: 文本1
text2: 文本2
Returns:
相似度分数0-1之间
"""
# 使用 SequenceMatcher 计算相似度
return SequenceMatcher(None, text1, text2).ratio()
def _find_unique_topics(self, platform_stats: Dict) -> Dict[str, List[str]]:
"""
找出各平台独有的热点话题
Args:
platform_stats: 平台统计数据
Returns:
各平台独有话题字典
"""
unique_topics = {}
# 获取每个平台的TOP关键词
platform_keywords = {}
for platform, stats in platform_stats.items():
top_keywords = set([kw for kw, _ in stats["top_keywords"].most_common(10)])
platform_keywords[platform] = top_keywords
# 找出独有关键词
for platform, keywords in platform_keywords.items():
# 找出其他平台的所有关键词
other_keywords = set()
for other_platform, other_kws in platform_keywords.items():
if other_platform != platform:
other_keywords.update(other_kws)
# 找出独有的
unique = keywords - other_keywords
if unique:
unique_topics[platform] = list(unique)[:5] # 最多5个
return unique_topics