"""FastAPI webhook server for receiving Netdata notifications.""" import ssl import tempfile from pathlib import Path from typing import Dict, Any import structlog import uvicorn from fastapi import FastAPI, HTTPException, Request, status from fastapi.responses import JSONResponse from .formatter import ZulipMessageFormatter from .models import WebhookPayload, ZulipConfig, ServerConfig from .netdata_ca import NETDATA_CA_CERT from .zulip_client import ZulipNotifier logger = structlog.get_logger() class NetdataWebhookServer: """FastAPI server for handling Netdata Cloud webhooks.""" def __init__(self, zulip_config: ZulipConfig, server_config: ServerConfig): """Initialize the webhook server.""" self.app = FastAPI( title="Netdata Zulip Bot", description="Webhook service for Netdata Cloud notifications", version="0.1.0" ) self.zulip_config = zulip_config self.server_config = server_config self.formatter = ZulipMessageFormatter() # Initialize Zulip client try: self.zulip_notifier = ZulipNotifier(zulip_config) except Exception as e: logger.error("Failed to initialize Zulip client", error=str(e)) raise self._setup_routes() self._setup_middleware() def _setup_routes(self): """Setup FastAPI routes.""" @self.app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "service": "netdata-zulip-bot"} @self.app.post("/webhook/netdata") async def netdata_webhook(request: Request): """Handle Netdata Cloud webhook notifications.""" try: # Get raw JSON data body = await request.json() logger.info("Received webhook", payload_keys=list(body.keys())) # Parse and validate the payload notification = WebhookPayload.parse(body) logger.info("Parsed notification", type=type(notification).__name__) # Format message for Zulip topic, content = self.formatter.format_notification(notification) logger.info("Formatted message", topic=topic) # Send to Zulip success = self.zulip_notifier.send_message(topic, content) if success: return {"status": "success", "message": "Notification sent to Zulip"} else: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send notification to Zulip" ) except ValueError as e: logger.error("Invalid webhook payload", error=str(e)) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid payload format: {str(e)}" ) except Exception as e: logger.error("Webhook processing failed", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) def _setup_middleware(self): """Setup middleware for logging and error handling.""" @self.app.middleware("http") async def log_requests(request: Request, call_next): """Log all requests.""" client_host = request.client.host if request.client else "unknown" logger.info( "Request received", method=request.method, url=str(request.url), client=client_host ) try: response = await call_next(request) logger.info( "Request completed", method=request.method, url=str(request.url), status_code=response.status_code ) return response except Exception as e: logger.error( "Request failed", method=request.method, url=str(request.url), error=str(e) ) raise def get_ssl_context(self) -> ssl.SSLContext: """Create SSL context for HTTPS and mutual TLS.""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # Load server certificate and key cert_path = Path(self.server_config.cert_path) / self.server_config.domain cert_file = cert_path / "fullchain.pem" key_file = cert_path / "privkey.pem" if not cert_file.exists() or not key_file.exists(): logger.error( "SSL certificate files not found", cert_file=str(cert_file), key_file=str(key_file) ) raise FileNotFoundError(f"SSL certificate files not found at {cert_path}") context.load_cert_chain(str(cert_file), str(key_file)) # Configure mutual TLS if enabled if self.server_config.enable_mtls: # Use the hardcoded Netdata CA certificate with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as ca_file: ca_file.write(NETDATA_CA_CERT) ca_file_path = ca_file.name try: context.load_verify_locations(ca_file_path) context.verify_mode = ssl.CERT_REQUIRED logger.info("Mutual TLS enabled with hardcoded Netdata CA certificate") finally: # Clean up the temporary file Path(ca_file_path).unlink(missing_ok=True) else: context.verify_mode = ssl.CERT_NONE logger.info("Mutual TLS disabled") return context def run(self): """Run the webhook server with HTTPS and optional mutual TLS.""" try: ssl_context = self.get_ssl_context() logger.info( "Starting Netdata Zulip webhook server", host=self.server_config.host, port=self.server_config.port, domain=self.server_config.domain, mtls_enabled=self.server_config.enable_mtls ) uvicorn.run( self.app, host=self.server_config.host, port=self.server_config.port, ssl_context=ssl_context, access_log=False, # We handle logging in middleware ) except Exception as e: logger.error("Failed to start server", error=str(e)) raise