Netdata.cloud bot for Zulip
1"""FastAPI webhook server for receiving Netdata notifications.""" 2 3import ssl 4import tempfile 5from pathlib import Path 6from typing import Dict, Any 7 8import structlog 9import uvicorn 10from fastapi import FastAPI, HTTPException, Request, status 11from fastapi.responses import JSONResponse 12 13from .cert_manager import CertificateManager 14from .formatter import ZulipMessageFormatter 15from .models import WebhookPayload, ZulipConfig, ServerConfig 16from .netdata_ca import NETDATA_CA_CERT 17from .zulip_client import ZulipNotifier 18 19logger = structlog.get_logger() 20 21 22class NetdataWebhookServer: 23 """FastAPI server for handling Netdata Cloud webhooks.""" 24 25 def __init__(self, zulip_config: ZulipConfig, server_config: ServerConfig): 26 """Initialize the webhook server.""" 27 self.app = FastAPI( 28 title="Netdata Zulip Bot", 29 description="Webhook service for Netdata Cloud notifications", 30 version="0.1.0" 31 ) 32 self.zulip_config = zulip_config 33 self.server_config = server_config 34 self.formatter = ZulipMessageFormatter() 35 self.cert_manager = None 36 37 # Initialize certificate manager if auto-cert is enabled 38 if self.server_config.auto_cert: 39 self.cert_manager = CertificateManager( 40 domain=self.server_config.domain, 41 email=self.server_config.cert_email, 42 cert_dir=Path(self.server_config.cert_path), 43 staging=self.server_config.cert_staging, 44 port=self.server_config.acme_port 45 ) 46 47 # Initialize Zulip client 48 try: 49 self.zulip_notifier = ZulipNotifier(zulip_config) 50 except Exception as e: 51 logger.error("Failed to initialize Zulip client", error=str(e)) 52 raise 53 54 self._setup_routes() 55 self._setup_middleware() 56 57 def _setup_routes(self): 58 """Setup FastAPI routes.""" 59 60 @self.app.get("/health") 61 async def health_check(): 62 """Health check endpoint.""" 63 return {"status": "healthy", "service": "netdata-zulip-bot"} 64 65 @self.app.post("/webhook/netdata") 66 async def netdata_webhook(request: Request): 67 """Handle Netdata Cloud webhook notifications.""" 68 try: 69 # Get raw JSON data 70 body = await request.json() 71 logger.info("Received webhook", payload_keys=list(body.keys())) 72 73 # Parse and validate the payload 74 notification = WebhookPayload.parse(body) 75 logger.info("Parsed notification", type=type(notification).__name__) 76 77 # Format message for Zulip 78 topic, content = self.formatter.format_notification(notification) 79 logger.info("Formatted message", topic=topic) 80 81 # Send to Zulip 82 success = self.zulip_notifier.send_message(topic, content) 83 84 if success: 85 return {"status": "success", "message": "Notification sent to Zulip"} 86 else: 87 raise HTTPException( 88 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 89 detail="Failed to send notification to Zulip" 90 ) 91 92 except ValueError as e: 93 logger.error("Invalid webhook payload", error=str(e)) 94 raise HTTPException( 95 status_code=status.HTTP_400_BAD_REQUEST, 96 detail=f"Invalid payload format: {str(e)}" 97 ) 98 except Exception as e: 99 logger.error("Webhook processing failed", error=str(e)) 100 raise HTTPException( 101 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 102 detail="Internal server error" 103 ) 104 105 def _setup_middleware(self): 106 """Setup middleware for logging and error handling.""" 107 108 @self.app.middleware("http") 109 async def log_requests(request: Request, call_next): 110 """Log all requests.""" 111 client_host = request.client.host if request.client else "unknown" 112 logger.info( 113 "Request received", 114 method=request.method, 115 url=str(request.url), 116 client=client_host 117 ) 118 119 try: 120 response = await call_next(request) 121 logger.info( 122 "Request completed", 123 method=request.method, 124 url=str(request.url), 125 status_code=response.status_code 126 ) 127 return response 128 except Exception as e: 129 logger.error( 130 "Request failed", 131 method=request.method, 132 url=str(request.url), 133 error=str(e) 134 ) 135 raise 136 137 def get_ssl_context(self) -> ssl.SSLContext: 138 """Create SSL context for HTTPS and mutual TLS.""" 139 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 140 141 # Get certificate paths 142 if self.cert_manager and self.server_config.auto_cert: 143 # Use automated certificates 144 try: 145 cert_file, key_file, fullchain_file = self.cert_manager.obtain_certificate() 146 logger.info( 147 "Using automated SSL certificate", 148 cert_file=str(cert_file), 149 key_file=str(key_file) 150 ) 151 except Exception as e: 152 logger.error("Failed to obtain automated certificate", error=str(e)) 153 raise 154 else: 155 # Use manually provided certificates 156 cert_path = Path(self.server_config.cert_path) / self.server_config.domain 157 fullchain_file = cert_path / "fullchain.pem" 158 key_file = cert_path / "privkey.pem" 159 160 if not fullchain_file.exists() or not key_file.exists(): 161 logger.error( 162 "SSL certificate files not found", 163 cert_file=str(fullchain_file), 164 key_file=str(key_file) 165 ) 166 raise FileNotFoundError(f"SSL certificate files not found at {cert_path}") 167 168 context.load_cert_chain(str(fullchain_file), str(key_file)) 169 170 # Configure mutual TLS if enabled 171 if self.server_config.enable_mtls: 172 # Use the hardcoded Netdata CA certificate 173 with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as ca_file: 174 ca_file.write(NETDATA_CA_CERT) 175 ca_file_path = ca_file.name 176 177 try: 178 context.load_verify_locations(ca_file_path) 179 context.verify_mode = ssl.CERT_REQUIRED 180 logger.info("Mutual TLS enabled with hardcoded Netdata CA certificate") 181 finally: 182 # Clean up the temporary file 183 Path(ca_file_path).unlink(missing_ok=True) 184 else: 185 context.verify_mode = ssl.CERT_NONE 186 logger.info("Mutual TLS disabled") 187 188 return context 189 190 def run(self): 191 """Run the webhook server with HTTPS and optional mutual TLS.""" 192 try: 193 # Setup automatic certificate renewal if enabled 194 if self.cert_manager and self.server_config.auto_cert: 195 self.cert_manager.setup_auto_renewal() 196 logger.info("Automatic certificate renewal enabled") 197 198 ssl_context = self.get_ssl_context() 199 200 logger.info( 201 "Starting Netdata Zulip webhook server", 202 host=self.server_config.host, 203 port=self.server_config.port, 204 domain=self.server_config.domain, 205 mtls_enabled=self.server_config.enable_mtls, 206 auto_cert=self.server_config.auto_cert 207 ) 208 209 uvicorn.run( 210 self.app, 211 host=self.server_config.host, 212 port=self.server_config.port, 213 ssl_context=ssl_context, 214 access_log=False, # We handle logging in middleware 215 ) 216 217 except Exception as e: 218 logger.error("Failed to start server", error=str(e)) 219 raise