""" Health check utilities and endpoints for observability. """ import time from typing import Dict, Any from app.db import SessionLocal, engine from app.http_client import get_http_client from app.logging import get_logger logger = get_logger(__name__) class HealthChecker: """Composite health checker for various system components.""" def __init__(self): self._last_db_check = 0 self._last_http_check = 0 self._db_status = False # Start with unknown status self._http_status = False # Start with unknown status self._check_interval = 30 # seconds async def check_database(self) -> Dict[str, Any]: """Check database connectivity and basic operations.""" current_time = time.time() # Use cached result if recent if current_time - self._last_db_check < self._check_interval: return { "status": "ok" if self._db_status else "error", "cached": True, "last_check": self._last_db_check } try: # Test database connection db = SessionLocal() # Simple query to test connection db.execute("SELECT 1") db.close() self._db_status = True self._last_db_check = current_time return { "status": "ok", "cached": False, "last_check": current_time } except Exception as e: logger.error(f"Database health check failed: {e}") self._db_status = False self._last_db_check = current_time return { "status": "error", "error": str(e), "cached": False, "last_check": current_time } async def check_http_client(self) -> Dict[str, Any]: """Check HTTP client availability.""" current_time = time.time() # Use cached result if recent if current_time - self._last_http_check < self._check_interval: return { "status": "ok" if self._http_status else "error", "cached": True, "last_check": self._last_http_check } try: # Check if HTTP client is available client = get_http_client() if client is None: # Try to create a temporary client to test import httpx async with httpx.AsyncClient(timeout=5) as test_client: # Just test that we can create and close a client pass self._http_status = True self._last_http_check = current_time return { "status": "ok", "cached": False, "last_check": current_time } except Exception as e: logger.error(f"HTTP client health check failed: {e}") self._http_status = False self._last_http_check = current_time return { "status": "error", "error": str(e), "cached": False, "last_check": current_time } async def check_overall(self) -> Dict[str, Any]: """Perform comprehensive health check.""" db_health = await self.check_database() http_health = await self.check_http_client() # Overall status is healthy only if all components are healthy overall_status = "ok" if ( db_health["status"] == "ok" and http_health["status"] == "ok" ) else "error" return { "status": overall_status, "timestamp": time.time(), "checks": { "database": db_health, "http_client": http_health }, "version": "2.2.0", # From dashboard "service": "webhook-relay" } # Global health checker instance health_checker = HealthChecker() class MetricsCollector: """Simple Prometheus-style metrics collector.""" def __init__(self): self._metrics = {} self._start_time = time.time() def increment_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None): """Increment a counter metric.""" key = name if labels: # Simple label encoding label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) key = f"{name}{{{label_str}}}" if key not in self._metrics: self._metrics[key] = {"type": "counter", "value": 0.0} self._metrics[key]["value"] += value def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): """Set a gauge metric.""" key = name if labels: label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) key = f"{name}{{{label_str}}}" self._metrics[key] = {"type": "gauge", "value": value} def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Observe a histogram value (simplified - just track last value).""" key = name if labels: label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) key = f"{name}{{{label_str}}}" self._metrics[key] = {"type": "histogram", "value": value} def get_prometheus_format(self) -> str: """Generate Prometheus exposition format output.""" lines = [] # Add HELP and TYPE comments for known metrics known_metrics = { "webhook_requests_total": "Total number of webhook requests processed", "webhook_processing_duration_seconds": "Time spent processing webhooks", "delivery_attempts_total": "Total number of delivery attempts", "delivery_failures_total": "Total number of delivery failures", "uptime_seconds": "Service uptime in seconds" } for key, metric_info in self._metrics.items(): # Extract metric name and labels if "{" in key: name = key.split("{")[0] labels_part = key.split("{")[1].rstrip("}") else: name = key labels_part = "" # Add HELP comment if name in known_metrics: lines.append(f"# HELP {name} {known_metrics[name]}") # Add TYPE comment lines.append(f"# TYPE {name} {metric_info['type']}") # Add metric value if labels_part: lines.append(f"{name}{{{labels_part}}} {metric_info['value']}") else: lines.append(f"{name} {metric_info['value']}") lines.append("") # Empty line between metrics # Add uptime metric uptime = time.time() - self._start_time lines.insert(0, "# HELP uptime_seconds Service uptime in seconds") lines.insert(1, "# TYPE uptime_seconds gauge") lines.insert(2, f"uptime_seconds {uptime}") lines.insert(3, "") return "\n".join(lines) # Global metrics collector instance metrics_collector = MetricsCollector()