From 6f4793a33045b6a57fabddb4f3c1b244b158cfe2 Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 11:10:54 +0800 Subject: [PATCH] feat: add comprehensive health checks and Prometheus metrics - Add app/health.py with HealthChecker and MetricsCollector classes - Implement composite /health endpoint checking DB and HTTP client - Add /metrics endpoint with Prometheus exposition format - Add webhook request metrics (counters, histograms, gauges) - Create app/http_client.py for shared AsyncClient management - Update app/main.py lifespan to init/close HTTP client - Add comprehensive tests in tests/test_health.py This enables proper observability with health checks and metrics for monitoring system status and webhook processing performance. --- app/health.py | 211 +++++++++++++++++++++++++++++++++++++++++++ app/http_client.py | 31 +++++++ app/main.py | 53 +++++++++-- tests/test_health.py | 180 ++++++++++++++++++++++++++++++++++++ 4 files changed, 465 insertions(+), 10 deletions(-) create mode 100644 app/health.py create mode 100644 app/http_client.py create mode 100644 tests/test_health.py diff --git a/app/health.py b/app/health.py new file mode 100644 index 0000000..bb90129 --- /dev/null +++ b/app/health.py @@ -0,0 +1,211 @@ +""" +Health check utilities and endpoints for observability. +""" +import time +from typing import Dict, Any +from app.db import SessionLocal, engine +from app.http_client import get_http_client +from app.logging import get_logger + +logger = get_logger(__name__) + +class HealthChecker: + """Composite health checker for various system components.""" + + def __init__(self): + self._last_db_check = 0 + self._last_http_check = 0 + self._db_status = False # Start with unknown status + self._http_status = False # Start with unknown status + self._check_interval = 30 # seconds + + async def check_database(self) -> Dict[str, Any]: + """Check database connectivity and basic operations.""" + current_time = time.time() + + # Use cached result if recent + if current_time - self._last_db_check < self._check_interval: + return { + "status": "ok" if self._db_status else "error", + "cached": True, + "last_check": self._last_db_check + } + + try: + # Test database connection + db = SessionLocal() + # Simple query to test connection + db.execute("SELECT 1") + db.close() + + self._db_status = True + self._last_db_check = current_time + return { + "status": "ok", + "cached": False, + "last_check": current_time + } + except Exception as e: + logger.error(f"Database health check failed: {e}") + self._db_status = False + self._last_db_check = current_time + return { + "status": "error", + "error": str(e), + "cached": False, + "last_check": current_time + } + + async def check_http_client(self) -> Dict[str, Any]: + """Check HTTP client availability.""" + current_time = time.time() + + # Use cached result if recent + if current_time - self._last_http_check < self._check_interval: + return { + "status": "ok" if self._http_status else "error", + "cached": True, + "last_check": self._last_http_check + } + + try: + # Check if HTTP client is available + client = get_http_client() + if client is None: + # Try to create a temporary client to test + import httpx + async with httpx.AsyncClient(timeout=5) as test_client: + # Just test that we can create and close a client + pass + + self._http_status = True + self._last_http_check = current_time + return { + "status": "ok", + "cached": False, + "last_check": current_time + } + except Exception as e: + logger.error(f"HTTP client health check failed: {e}") + self._http_status = False + self._last_http_check = current_time + return { + "status": "error", + "error": str(e), + "cached": False, + "last_check": current_time + } + + async def check_overall(self) -> Dict[str, Any]: + """Perform comprehensive health check.""" + db_health = await self.check_database() + http_health = await self.check_http_client() + + # Overall status is healthy only if all components are healthy + overall_status = "ok" if ( + db_health["status"] == "ok" and + http_health["status"] == "ok" + ) else "error" + + return { + "status": overall_status, + "timestamp": time.time(), + "checks": { + "database": db_health, + "http_client": http_health + }, + "version": "2.2.0", # From dashboard + "service": "webhook-relay" + } + + +# Global health checker instance +health_checker = HealthChecker() + + +class MetricsCollector: + """Simple Prometheus-style metrics collector.""" + + def __init__(self): + self._metrics = {} + self._start_time = time.time() + + def increment_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None): + """Increment a counter metric.""" + key = name + if labels: + # Simple label encoding + label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) + key = f"{name}{{{label_str}}}" + + if key not in self._metrics: + self._metrics[key] = {"type": "counter", "value": 0.0} + self._metrics[key]["value"] += value + + def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): + """Set a gauge metric.""" + key = name + if labels: + label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) + key = f"{name}{{{label_str}}}" + + self._metrics[key] = {"type": "gauge", "value": value} + + def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): + """Observe a histogram value (simplified - just track last value).""" + key = name + if labels: + label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) + key = f"{name}{{{label_str}}}" + + self._metrics[key] = {"type": "histogram", "value": value} + + def get_prometheus_format(self) -> str: + """Generate Prometheus exposition format output.""" + lines = [] + + # Add HELP and TYPE comments for known metrics + known_metrics = { + "webhook_requests_total": "Total number of webhook requests processed", + "webhook_processing_duration_seconds": "Time spent processing webhooks", + "delivery_attempts_total": "Total number of delivery attempts", + "delivery_failures_total": "Total number of delivery failures", + "uptime_seconds": "Service uptime in seconds" + } + + for key, metric_info in self._metrics.items(): + # Extract metric name and labels + if "{" in key: + name = key.split("{")[0] + labels_part = key.split("{")[1].rstrip("}") + else: + name = key + labels_part = "" + + # Add HELP comment + if name in known_metrics: + lines.append(f"# HELP {name} {known_metrics[name]}") + + # Add TYPE comment + lines.append(f"# TYPE {name} {metric_info['type']}") + + # Add metric value + if labels_part: + lines.append(f"{name}{{{labels_part}}} {metric_info['value']}") + else: + lines.append(f"{name} {metric_info['value']}") + + lines.append("") # Empty line between metrics + + # Add uptime metric + uptime = time.time() - self._start_time + lines.insert(0, "# HELP uptime_seconds Service uptime in seconds") + lines.insert(1, "# TYPE uptime_seconds gauge") + lines.insert(2, f"uptime_seconds {uptime}") + lines.insert(3, "") + + return "\n".join(lines) + + +# Global metrics collector instance +metrics_collector = MetricsCollector() diff --git a/app/http_client.py b/app/http_client.py new file mode 100644 index 0000000..e4fb473 --- /dev/null +++ b/app/http_client.py @@ -0,0 +1,31 @@ +""" +HTTP client management for shared AsyncClient instance. +""" +import httpx +from typing import Optional + +# Global HTTP client instance +_http_client: Optional[httpx.AsyncClient] = None + + +async def init_http_client(): + """Initialize the shared HTTP client.""" + global _http_client + if _http_client is None: + _http_client = httpx.AsyncClient( + timeout=httpx.Timeout(10.0, connect=5.0), + limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) + ) + + +async def close_http_client(): + """Close the shared HTTP client.""" + global _http_client + if _http_client: + await _http_client.aclose() + _http_client = None + + +def get_http_client() -> Optional[httpx.AsyncClient]: + """Get the shared HTTP client instance.""" + return _http_client diff --git a/app/main.py b/app/main.py index d90d558..626e33c 100644 --- a/app/main.py +++ b/app/main.py @@ -6,17 +6,25 @@ from app.admin import router as admin_router from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db from app.services.engine import engine from app.models import IncomingPayload +from app.health import health_checker, metrics_collector +from app.http_client import init_http_client, close_http_client from contextlib import asynccontextmanager logger = get_logger("app") @asynccontextmanager async def lifespan(app: FastAPI): - # Startup: Initialize DB + # Startup: Initialize DB and HTTP client logger.info("Initializing database...") init_db() - yield - # Shutdown logic if any + logger.info("Initializing http client...") + await init_http_client() + try: + yield + finally: + # Shutdown: close shared http client + logger.info("Shutting down http client...") + await close_http_client() app = FastAPI(lifespan=lifespan) app.include_router(admin_router) @@ -66,21 +74,31 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): @app.get("/health") async def health(): - return {"status": "ok"} + """Comprehensive health check endpoint.""" + return await health_checker.check_overall() + +@app.get("/metrics") +async def metrics(): + """Prometheus-style metrics endpoint.""" + return metrics_collector.get_prometheus_format() @app.post("/webhook/{namespace}") async def webhook(namespace: str, payload: IncomingPayload = Body(...), background_tasks: BackgroundTasks = BackgroundTasks()): + import time + start_time = time.time() + db = SessionLocal() endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first() - + if not endpoint: - # Auto-create for convenience if needed, or reject. For now reject if not exists. - # Or better: log warning but don't process. - # But per requirements "add endpoint", so we expect it to exist. + # Record failed request metric + metrics_collector.increment_counter("webhook_requests_total", labels={"status": "endpoint_not_found"}) db.close() return JSONResponse({"error": "Endpoint not found"}, status_code=404) - + if not endpoint.is_active: + # Record failed request metric + metrics_collector.increment_counter("webhook_requests_total", labels={"status": "endpoint_inactive"}) db.close() return JSONResponse({"error": "Endpoint inactive"}, status_code=403) @@ -91,11 +109,26 @@ async def webhook(namespace: str, payload: IncomingPayload = Body(...), backgrou try: body_dict = payload.model_dump() except Exception: + metrics_collector.increment_counter("webhook_requests_total", labels={"status": "invalid_payload"}) return JSONResponse({"error": "Invalid payload"}, status_code=400) # Use new engine routed, notified = await engine.process(endpoint_id, body_dict) - + + # Record successful request metric + metrics_collector.increment_counter("webhook_requests_total", labels={"status": "success"}) + + # Record processing duration + processing_time = time.time() - start_time + metrics_collector.observe_histogram("webhook_processing_duration_seconds", processing_time) + + # Record delivery attempts and failures + total_attempts = sum(r.get("_retry_attempts", 1) for r in routed) + sum(n.get("_retry_attempts", 1) for n in notified) + total_failures = sum(1 for r in routed if not r.get("ok", False)) + sum(1 for n in notified if not n.get("ok", False)) + + metrics_collector.increment_counter("delivery_attempts_total", total_attempts) + metrics_collector.increment_counter("delivery_failures_total", total_failures) + # Async save logs background_tasks.add_task(save_logs, namespace, body_dict, routed, notified) diff --git a/tests/test_health.py b/tests/test_health.py new file mode 100644 index 0000000..44dfdc2 --- /dev/null +++ b/tests/test_health.py @@ -0,0 +1,180 @@ +import pytest +from unittest.mock import patch, AsyncMock, Mock +from app.health import HealthChecker, MetricsCollector + + +class TestHealthChecker: + @pytest.mark.asyncio + async def test_database_check_success(self): + """Test successful database health check.""" + checker = HealthChecker() + + # Mock successful database operation (SessionLocal returns sync session) + with patch('app.health.SessionLocal') as mock_session: + mock_db = Mock() + mock_session.return_value = mock_db + + result = await checker.check_database() + + assert result["status"] == "ok" + assert result["cached"] is False + assert "last_check" in result + mock_db.execute.assert_called_with("SELECT 1") + mock_db.close.assert_called_once() + + @pytest.mark.asyncio + async def test_database_check_failure(self): + """Test failed database health check.""" + checker = HealthChecker() + + with patch('app.health.SessionLocal') as mock_session: + mock_db = Mock() + mock_session.return_value = mock_db + mock_db.execute.side_effect = Exception("Connection failed") + + result = await checker.check_database() + + assert result["status"] == "error" + assert "Connection failed" in result["error"] + assert result["cached"] is False + + @pytest.mark.asyncio + async def test_http_client_check_success(self): + """Test successful HTTP client health check.""" + checker = HealthChecker() + + with patch('app.health.get_http_client') as mock_get_client: + mock_client = AsyncMock() + mock_get_client.return_value = mock_client + + result = await checker.check_http_client() + + assert result["status"] == "ok" + assert result["cached"] is False + + @pytest.mark.asyncio + async def test_http_client_check_with_fallback(self): + """Test HTTP client check when primary client is None (fallback to httpx).""" + checker = HealthChecker() + + with patch('app.health.get_http_client') as mock_get_client, \ + patch('httpx.AsyncClient') as mock_httpx_client: + + mock_get_client.return_value = None + mock_client = AsyncMock() + mock_httpx_client.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_httpx_client.return_value.__aexit__ = AsyncMock(return_value=None) + + result = await checker.check_http_client() + + assert result["status"] == "ok" + assert result["cached"] is False + + @pytest.mark.asyncio + async def test_overall_health_check_success(self): + """Test comprehensive health check when all components are healthy.""" + checker = HealthChecker() + + with patch.object(checker, 'check_database', new_callable=AsyncMock) as mock_db_check, \ + patch.object(checker, 'check_http_client', new_callable=AsyncMock) as mock_http_check: + + mock_db_check.return_value = {"status": "ok"} + mock_http_check.return_value = {"status": "ok"} + + result = await checker.check_overall() + + assert result["status"] == "ok" + assert "timestamp" in result + assert "checks" in result + assert result["checks"]["database"]["status"] == "ok" + assert result["checks"]["http_client"]["status"] == "ok" + assert result["version"] == "2.2.0" + assert result["service"] == "webhook-relay" + + @pytest.mark.asyncio + async def test_overall_health_check_partial_failure(self): + """Test comprehensive health check when some components fail.""" + checker = HealthChecker() + + with patch.object(checker, 'check_database', new_callable=AsyncMock) as mock_db_check, \ + patch.object(checker, 'check_http_client', new_callable=AsyncMock) as mock_http_check: + + mock_db_check.return_value = {"status": "ok"} + mock_http_check.return_value = {"status": "error", "error": "Connection timeout"} + + result = await checker.check_overall() + + assert result["status"] == "error" + assert result["checks"]["database"]["status"] == "ok" + assert result["checks"]["http_client"]["status"] == "error" + + +class TestMetricsCollector: + def test_increment_counter(self): + """Test counter metric increment.""" + collector = MetricsCollector() + + collector.increment_counter("test_counter") + collector.increment_counter("test_counter", 2.0) + + prometheus_output = collector.get_prometheus_format() + assert "test_counter 3.0" in prometheus_output + assert "# TYPE test_counter counter" in prometheus_output + + def test_increment_counter_with_labels(self): + """Test counter metric with labels.""" + collector = MetricsCollector() + + collector.increment_counter("requests_total", labels={"method": "POST", "status": "200"}) + + prometheus_output = collector.get_prometheus_format() + assert 'requests_total{method=POST,status=200} 1.0' in prometheus_output + + def test_set_gauge(self): + """Test gauge metric setting.""" + collector = MetricsCollector() + + collector.set_gauge("memory_usage", 85.5, labels={"unit": "percent"}) + + prometheus_output = collector.get_prometheus_format() + assert 'memory_usage{unit=percent} 85.5' in prometheus_output + assert "# TYPE memory_usage gauge" in prometheus_output + + def test_observe_histogram(self): + """Test histogram metric observation.""" + collector = MetricsCollector() + + collector.observe_histogram("response_time", 0.125, labels={"endpoint": "/health"}) + + prometheus_output = collector.get_prometheus_format() + assert 'response_time{endpoint=/health} 0.125' in prometheus_output + assert "# TYPE response_time histogram" in prometheus_output + + def test_uptime_metric(self): + """Test that uptime metric is automatically included.""" + collector = MetricsCollector() + + prometheus_output = collector.get_prometheus_format() + assert "# TYPE uptime_seconds gauge" in prometheus_output + assert "uptime_seconds " in prometheus_output + + def test_prometheus_format_structure(self): + """Test overall Prometheus format structure.""" + collector = MetricsCollector() + + collector.increment_counter("webhook_requests_total", labels={"status": "success"}) + collector.observe_histogram("webhook_processing_duration_seconds", 0.5) + + output = collector.get_prometheus_format() + + # Should contain HELP comments for known metrics + assert "# HELP webhook_requests_total Total number of webhook requests processed" in output + assert "# HELP webhook_processing_duration_seconds Time spent processing webhooks" in output + + # Should have proper TYPE declarations + assert "# TYPE webhook_requests_total counter" in output + assert "# TYPE webhook_processing_duration_seconds histogram" in output + + # Should have metric values + assert 'webhook_requests_total{status=success} 1.0' in output + assert 'webhook_processing_duration_seconds 0.5' in output