TrendRadar/trendradar/crawler/fetcher.py
2025-12-13 13:44:35 +08:00

185 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
"""
数据获取器模块
负责从 NewsNow API 抓取新闻数据,支持:
- 单个平台数据获取
- 批量平台数据爬取
- 自动重试机制
- 代理支持
"""
import json
import random
import time
from typing import Dict, List, Tuple, Optional, Union
import requests
class DataFetcher:
"""数据获取器"""
# 默认 API 地址
DEFAULT_API_URL = "https://newsnow.busiyi.world/api/s"
# 默认请求头
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
}
def __init__(
self,
proxy_url: Optional[str] = None,
api_url: Optional[str] = None,
):
"""
初始化数据获取器
Args:
proxy_url: 代理服务器 URL可选
api_url: API 基础 URL可选默认使用 DEFAULT_API_URL
"""
self.proxy_url = proxy_url
self.api_url = api_url or self.DEFAULT_API_URL
def fetch_data(
self,
id_info: Union[str, Tuple[str, str]],
max_retries: int = 2,
min_retry_wait: int = 3,
max_retry_wait: int = 5,
) -> Tuple[Optional[str], str, str]:
"""
获取指定ID数据支持重试
Args:
id_info: 平台ID 或 (平台ID, 别名) 元组
max_retries: 最大重试次数
min_retry_wait: 最小重试等待时间(秒)
max_retry_wait: 最大重试等待时间(秒)
Returns:
(响应文本, 平台ID, 别名) 元组,失败时响应文本为 None
"""
if isinstance(id_info, tuple):
id_value, alias = id_info
else:
id_value = id_info
alias = id_value
url = f"{self.api_url}?id={id_value}&latest"
proxies = None
if self.proxy_url:
proxies = {"http": self.proxy_url, "https": self.proxy_url}
retries = 0
while retries <= max_retries:
try:
response = requests.get(
url,
proxies=proxies,
headers=self.DEFAULT_HEADERS,
timeout=10,
)
response.raise_for_status()
data_text = response.text
data_json = json.loads(data_text)
status = data_json.get("status", "未知")
if status not in ["success", "cache"]:
raise ValueError(f"响应状态异常: {status}")
status_info = "最新数据" if status == "success" else "缓存数据"
print(f"获取 {id_value} 成功({status_info}")
return data_text, id_value, alias
except Exception as e:
retries += 1
if retries <= max_retries:
base_wait = random.uniform(min_retry_wait, max_retry_wait)
additional_wait = (retries - 1) * random.uniform(1, 2)
wait_time = base_wait + additional_wait
print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
time.sleep(wait_time)
else:
print(f"请求 {id_value} 失败: {e}")
return None, id_value, alias
return None, id_value, alias
def crawl_websites(
self,
ids_list: List[Union[str, Tuple[str, str]]],
request_interval: int = 100,
) -> Tuple[Dict, Dict, List]:
"""
爬取多个网站数据
Args:
ids_list: 平台ID列表每个元素可以是字符串或 (平台ID, 别名) 元组
request_interval: 请求间隔(毫秒)
Returns:
(结果字典, ID到名称的映射, 失败ID列表) 元组
"""
results = {}
id_to_name = {}
failed_ids = []
for i, id_info in enumerate(ids_list):
if isinstance(id_info, tuple):
id_value, name = id_info
else:
id_value = id_info
name = id_value
id_to_name[id_value] = name
response, _, _ = self.fetch_data(id_info)
if response:
try:
data = json.loads(response)
results[id_value] = {}
for index, item in enumerate(data.get("items", []), 1):
title = item.get("title")
# 跳过无效标题None、float、空字符串
if title is None or isinstance(title, float) or not str(title).strip():
continue
title = str(title).strip()
url = item.get("url", "")
mobile_url = item.get("mobileUrl", "")
if title in results[id_value]:
results[id_value][title]["ranks"].append(index)
else:
results[id_value][title] = {
"ranks": [index],
"url": url,
"mobileUrl": mobile_url,
}
except json.JSONDecodeError:
print(f"解析 {id_value} 响应失败")
failed_ids.append(id_value)
except Exception as e:
print(f"处理 {id_value} 数据出错: {e}")
failed_ids.append(id_value)
else:
failed_ids.append(id_value)
# 请求间隔(除了最后一个)
if i < len(ids_list) - 1:
actual_interval = request_interval + random.randint(-10, 20)
actual_interval = max(50, actual_interval)
time.sleep(actual_interval / 1000)
print(f"成功: {list(results.keys())}, 失败: {failed_ids}")
return results, id_to_name, failed_ids