diff --git a/trendradar/__main__.py b/trendradar/__main__.py index 6c9682a..51af38d 100644 --- a/trendradar/__main__.py +++ b/trendradar/__main__.py @@ -14,9 +14,7 @@ from typing import Dict, List, Tuple, Optional import requests from trendradar.context import AppContext - -# 版本号直接定义,避免循环导入 -VERSION = "4.0.0" +from trendradar import __version__ from trendradar.core import load_config from trendradar.crawler import DataFetcher from trendradar.storage import convert_crawl_results_to_news_data @@ -105,7 +103,7 @@ class NewsAnalyzer: # 加载配置 print("正在加载配置...") config = load_config() - print(f"TrendRadar v{VERSION} 配置加载完成") + print(f"TrendRadar v{__version__} 配置加载完成") print(f"监控平台数量: {len(config['PLATFORMS'])}") print(f"时区: {config.get('TIMEZONE', 'Asia/Shanghai')}") @@ -174,15 +172,15 @@ class NewsAnalyzer: """检查版本更新""" try: need_update, remote_version = check_version_update( - VERSION, self.ctx.config["VERSION_CHECK_URL"], self.proxy_url + __version__, self.ctx.config["VERSION_CHECK_URL"], self.proxy_url ) if need_update and remote_version: self.update_info = { - "current_version": VERSION, + "current_version": __version__, "remote_version": remote_version, } - print(f"发现新版本: {remote_version} (当前: {VERSION})") + print(f"发现新版本: {remote_version} (当前: {__version__})") else: print("版本检查完成,当前为最新版本") except Exception as e: diff --git a/trendradar/storage/remote.py b/trendradar/storage/remote.py index a9b071d..06e2a5b 100644 --- a/trendradar/storage/remote.py +++ b/trendradar/storage/remote.py @@ -93,7 +93,11 @@ class RemoteStorageBackend(StorageBackend): # 初始化 S3 客户端 # 使用 virtual-hosted style addressing(主流) - s3_config = BotoConfig(s3={"addressing_style": "virtual"}) + # 使用 s3 签名版本(v2),避免 s3v4 签名可能导致的 chunked encoding 问题 + s3_config = BotoConfig( + s3={"addressing_style": "virtual"}, + signature_version='s3', # 使用 S3v2 签名,兼容性更好 + ) client_kwargs = { "endpoint_url": endpoint_url, @@ -133,7 +137,7 @@ class RemoteStorageBackend(StorageBackend): return format_time_filename(self.timezone) def _get_remote_db_key(self, date: Optional[str] = None) -> str: - """获取 R2 中 SQLite 文件的对象键""" + """获取远程存储中 SQLite 文件的对象键""" date_folder = self._format_date_folder(date) return f"news/{date_folder}.db" @@ -144,10 +148,10 @@ class RemoteStorageBackend(StorageBackend): def _check_object_exists(self, r2_key: str) -> bool: """ - 检查 R2 中对象是否存在 + 检查远程存储中对象是否存在 Args: - r2_key: R2 对象键 + r2_key: 远程对象键 Returns: 是否存在 @@ -157,7 +161,7 @@ class RemoteStorageBackend(StorageBackend): return True except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "") - # R2/S3 可能返回 404, NoSuchKey, 或其他变体 + # S3 兼容存储可能返回 404, NoSuchKey, 或其他变体 if error_code in ("404", "NoSuchKey", "Not Found"): return False # 其他错误(如权限问题)也视为不存在,但打印警告 @@ -169,7 +173,10 @@ class RemoteStorageBackend(StorageBackend): def _download_sqlite(self, date: Optional[str] = None) -> Optional[Path]: """ - 从 R2 下载当天的 SQLite 文件到本地临时目录 + 从远程存储下载当天的 SQLite 文件到本地临时目录 + + 使用 get_object + iter_chunks 替代 download_file, + 以正确处理腾讯云 COS 的 chunked transfer encoding。 Args: date: 日期字符串 @@ -189,13 +196,18 @@ class RemoteStorageBackend(StorageBackend): return None try: - self.s3_client.download_file(self.bucket_name, r2_key, str(local_path)) + # 使用 get_object + iter_chunks 替代 download_file + # iter_chunks 会自动处理 chunked transfer encoding + response = self.s3_client.get_object(Bucket=self.bucket_name, Key=r2_key) + with open(local_path, 'wb') as f: + for chunk in response['Body'].iter_chunks(chunk_size=1024*1024): + f.write(chunk) self._downloaded_files.append(local_path) print(f"[远程存储] 已下载: {r2_key} -> {local_path}") return local_path except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "") - # R2/S3 可能返回不同的错误码 + # S3 兼容存储可能返回不同的错误码 if error_code in ("404", "NoSuchKey", "Not Found"): print(f"[远程存储] 文件不存在,将创建新数据库: {r2_key}") return None @@ -208,7 +220,7 @@ class RemoteStorageBackend(StorageBackend): def _upload_sqlite(self, date: Optional[str] = None) -> bool: """ - 上传本地 SQLite 文件到 R2 + 上传本地 SQLite 文件到远程存储 Args: date: 日期字符串 @@ -228,7 +240,20 @@ class RemoteStorageBackend(StorageBackend): local_size = local_path.stat().st_size print(f"[远程存储] 准备上传: {local_path} ({local_size} bytes) -> {r2_key}") - self.s3_client.upload_file(str(local_path), self.bucket_name, r2_key) + # 读取文件内容为 bytes 后上传 + # 避免传入文件对象时 requests 库使用 chunked transfer encoding + # 腾讯云 COS 等 S3 兼容服务可能无法正确处理 chunked encoding + with open(local_path, 'rb') as f: + file_content = f.read() + + # 使用 put_object 并明确设置 ContentLength,确保不使用 chunked encoding + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=r2_key, + Body=file_content, + ContentLength=local_size, + ContentType='application/octet-stream', + ) print(f"[远程存储] 已上传: {local_path} -> {r2_key}") # 验证上传成功 @@ -236,7 +261,7 @@ class RemoteStorageBackend(StorageBackend): print(f"[远程存储] 上传验证成功: {r2_key}") return True else: - print(f"[远程存储] 上传验证失败: 文件未在 R2 中找到") + print(f"[远程存储] 上传验证失败: 文件未在远程存储中找到") return False except Exception as e: @@ -252,7 +277,7 @@ class RemoteStorageBackend(StorageBackend): # 确保目录存在 local_path.parent.mkdir(parents=True, exist_ok=True) - # 如果本地不存在,尝试从 R2 下载 + # 如果本地不存在,尝试从远程存储下载 if not local_path.exists(): self._download_sqlite(date) @@ -282,9 +307,9 @@ class RemoteStorageBackend(StorageBackend): def save_news_data(self, data: NewsData) -> bool: """ - 保存新闻数据到 R2(以 URL 为唯一标识,支持标题更新检测) + 保存新闻数据到远程存储(以 URL 为唯一标识,支持标题更新检测) - 流程:下载现有数据库 → 插入/更新数据 → 上传回 R2 + 流程:下载现有数据库 → 插入/更新数据 → 上传回远程存储 Args: data: 新闻数据 @@ -466,12 +491,12 @@ class RemoteStorageBackend(StorageBackend): log_parts.append(f"(去重后总计: {final_count} 条)") print(",".join(log_parts)) - # 上传到 R2 + # 上传到远程存储 if self._upload_sqlite(data.date): - print(f"[远程存储] 数据已同步到 R2") + print(f"[远程存储] 数据已同步到远程存储") return True else: - print(f"[远程存储] 上传 R2 失败") + print(f"[远程存储] 上传远程存储失败") return False except Exception as e: @@ -695,7 +720,7 @@ class RemoteStorageBackend(StorageBackend): return {} def save_txt_snapshot(self, data: NewsData) -> Optional[str]: - """保存 TXT 快照(R2 模式下默认不支持)""" + """保存 TXT 快照(远程存储模式下默认不支持)""" if not self.enable_txt: return None @@ -817,7 +842,7 @@ class RemoteStorageBackend(StorageBackend): def cleanup_old_data(self, retention_days: int) -> int: """ - 清理 R2 上的过期数据 + 清理远程存储上的过期数据 Args: retention_days: 保留天数(0 表示不清理) @@ -832,7 +857,7 @@ class RemoteStorageBackend(StorageBackend): cutoff_date = self._get_configured_time() - timedelta(days=retention_days) try: - # 列出 R2 中 news/ 前缀下的所有对象 + # 列出远程存储中 news/ 前缀下的所有对象 paginator = self.s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=self.bucket_name, Prefix="news/") @@ -964,12 +989,12 @@ class RemoteStorageBackend(StorageBackend): print(f"[远程存储] 推送记录已保存: {report_type} at {now_str}") - # 上传到 R2 确保记录持久化 + # 上传到远程存储 确保记录持久化 if self._upload_sqlite(date): - print(f"[远程存储] 推送记录已同步到 R2") + print(f"[远程存储] 推送记录已同步到远程存储") return True else: - print(f"[远程存储] 推送记录同步到 R2 失败") + print(f"[远程存储] 推送记录同步到远程存储失败") return False except Exception as e: @@ -1030,14 +1055,13 @@ class RemoteStorageBackend(StorageBackend): print(f"[远程存储] 跳过(远程不存在): {date_str}") continue - # 下载 + # 下载(使用 get_object + iter_chunks 处理 chunked encoding) try: local_date_dir.mkdir(parents=True, exist_ok=True) - self.s3_client.download_file( - self.bucket_name, - remote_key, - str(local_db_path) - ) + response = self.s3_client.get_object(Bucket=self.bucket_name, Key=remote_key) + with open(local_db_path, 'wb') as f: + for chunk in response['Body'].iter_chunks(chunk_size=1024*1024): + f.write(chunk) print(f"[远程存储] 已拉取: {remote_key} -> {local_db_path}") pulled_count += 1 except Exception as e: