""" 缓存服务 实现TTL缓存机制,提升数据访问性能。 """ import time from typing import Any, Optional from threading import Lock class CacheService: """缓存服务类""" def __init__(self): """初始化缓存服务""" self._cache = {} self._timestamps = {} self._lock = Lock() def get(self, key: str, ttl: int = 900) -> Optional[Any]: """ 获取缓存数据 Args: key: 缓存键 ttl: 存活时间(秒),默认15分钟 Returns: 缓存的值,如果不存在或已过期则返回None """ with self._lock: if key in self._cache: # 检查是否过期 if time.time() - self._timestamps[key] < ttl: return self._cache[key] else: # 已过期,删除缓存 del self._cache[key] del self._timestamps[key] return None def set(self, key: str, value: Any) -> None: """ 设置缓存数据 Args: key: 缓存键 value: 缓存值 """ with self._lock: self._cache[key] = value self._timestamps[key] = time.time() def delete(self, key: str) -> bool: """ 删除缓存 Args: key: 缓存键 Returns: 是否成功删除 """ with self._lock: if key in self._cache: del self._cache[key] del self._timestamps[key] return True return False def clear(self) -> None: """清空所有缓存""" with self._lock: self._cache.clear() self._timestamps.clear() def cleanup_expired(self, ttl: int = 900) -> int: """ 清理过期缓存 Args: ttl: 存活时间(秒) Returns: 清理的条目数量 """ with self._lock: current_time = time.time() expired_keys = [ key for key, timestamp in self._timestamps.items() if current_time - timestamp >= ttl ] for key in expired_keys: del self._cache[key] del self._timestamps[key] return len(expired_keys) def get_stats(self) -> dict: """ 获取缓存统计信息 Returns: 统计信息字典 """ with self._lock: return { "total_entries": len(self._cache), "oldest_entry_age": ( time.time() - min(self._timestamps.values()) if self._timestamps else 0 ), "newest_entry_age": ( time.time() - max(self._timestamps.values()) if self._timestamps else 0 ) } # 全局缓存实例 _global_cache = None def get_cache() -> CacheService: """ 获取全局缓存实例 Returns: 全局缓存服务实例 """ global _global_cache if _global_cache is None: _global_cache = CacheService() return _global_cache