"""FastAPI webhook server for receiving Netdata notifications.""" import base64 import hashlib import hmac import structlog import uvicorn from fastapi import FastAPI, HTTPException, Query, Request, status from .formatter import ZulipMessageFormatter from .models import ServerConfig, WebhookPayload, ZulipConfig from .zulip_client import ZulipNotifier logger = structlog.get_logger() class NetdataWebhookServer: """FastAPI server for handling Netdata Cloud webhooks.""" def __init__(self, zulip_config: ZulipConfig, server_config: ServerConfig): """Initialize the webhook server.""" self.app = FastAPI( title="Netdata Zulip Bot", description="Webhook service for Netdata Cloud notifications", version="0.1.0", ) self.zulip_config = zulip_config self.server_config = server_config self.formatter = ZulipMessageFormatter() # Initialize Zulip client try: self.zulip_notifier = ZulipNotifier(zulip_config) except Exception as e: logger.error("Failed to initialize Zulip client", error=str(e)) raise self._setup_routes() self._setup_middleware() def _setup_routes(self): """Setup FastAPI routes.""" @self.app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "service": "netdata-zulip-bot"} @self.app.get("/webhook/netdata") async def netdata_webhook_challenge(crc_token: str = Query(...)): """Handle Netdata Cloud webhook challenge for verification.""" if not self.server_config.challenge_secret: logger.error("Challenge secret not configured") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Challenge secret not configured", ) try: # Create HMAC SHA-256 hash from crc_token and challenge secret token_bytes = crc_token.encode("ascii") secret_bytes = self.server_config.challenge_secret.encode("utf-8") sha256_hash = hmac.new( secret_bytes, msg=token_bytes, digestmod=hashlib.sha256 ).digest() # Create response with base64 encoded hash response_token = "sha256=" + base64.b64encode(sha256_hash).decode( "ascii" ) logger.info( "Responding to Netdata challenge", crc_token=crc_token[:16] + "..." ) return {"response_token": response_token} except Exception as e: logger.error("Failed to process challenge", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to process challenge", ) @self.app.post("/webhook/netdata") async def netdata_webhook(request: Request): """Handle Netdata Cloud webhook notifications.""" try: # Get raw JSON data body = await request.json() logger.info("Received webhook", payload_keys=list(body.keys())) # Parse and validate the payload notification = WebhookPayload.parse(body) logger.info("Parsed notification", type=type(notification).__name__) # Format message for Zulip topic, content = self.formatter.format_notification(notification) logger.info("Formatted message", topic=topic) # Send to Zulip success = self.zulip_notifier.send_message(topic, content) if success: return { "status": "success", "message": "Notification sent to Zulip", } else: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send notification to Zulip", ) except ValueError as e: logger.error("Invalid webhook payload", error=str(e)) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid payload format: {str(e)}", ) except Exception as e: logger.error("Webhook processing failed", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error", ) def _setup_middleware(self): """Setup middleware for logging and error handling.""" @self.app.middleware("http") async def log_requests(request: Request, call_next): """Log all requests.""" client_host = request.client.host if request.client else "unknown" logger.info( "Request received", method=request.method, url=str(request.url), client=client_host, ) try: response = await call_next(request) logger.info( "Request completed", method=request.method, url=str(request.url), status_code=response.status_code, ) return response except Exception as e: logger.error( "Request failed", method=request.method, url=str(request.url), error=str(e), ) raise def run(self): """Run the webhook server (HTTP only, TLS handled by reverse proxy).""" try: logger.info( "Starting Netdata Zulip webhook server (HTTP)", host=self.server_config.host, port=self.server_config.port, ) uvicorn.run( self.app, host=self.server_config.host, port=self.server_config.port, access_log=False, # We handle logging in middleware ) except Exception as e: logger.error("Failed to start server", error=str(e)) raise