Netdata.cloud bot for Zulip
1"""FastAPI webhook server for receiving Netdata notifications."""
2
3import ssl
4import tempfile
5from pathlib import Path
6from typing import Dict, Any
7
8import structlog
9import uvicorn
10from fastapi import FastAPI, HTTPException, Request, status
11from fastapi.responses import JSONResponse
12
13from .formatter import ZulipMessageFormatter
14from .models import WebhookPayload, ZulipConfig, ServerConfig
15from .netdata_ca import NETDATA_CA_CERT
16from .zulip_client import ZulipNotifier
17
18logger = structlog.get_logger()
19
20
21class NetdataWebhookServer:
22 """FastAPI server for handling Netdata Cloud webhooks."""
23
24 def __init__(self, zulip_config: ZulipConfig, server_config: ServerConfig):
25 """Initialize the webhook server."""
26 self.app = FastAPI(
27 title="Netdata Zulip Bot",
28 description="Webhook service for Netdata Cloud notifications",
29 version="0.1.0"
30 )
31 self.zulip_config = zulip_config
32 self.server_config = server_config
33 self.formatter = ZulipMessageFormatter()
34
35 # Initialize Zulip client
36 try:
37 self.zulip_notifier = ZulipNotifier(zulip_config)
38 except Exception as e:
39 logger.error("Failed to initialize Zulip client", error=str(e))
40 raise
41
42 self._setup_routes()
43 self._setup_middleware()
44
45 def _setup_routes(self):
46 """Setup FastAPI routes."""
47
48 @self.app.get("/health")
49 async def health_check():
50 """Health check endpoint."""
51 return {"status": "healthy", "service": "netdata-zulip-bot"}
52
53 @self.app.post("/webhook/netdata")
54 async def netdata_webhook(request: Request):
55 """Handle Netdata Cloud webhook notifications."""
56 try:
57 # Get raw JSON data
58 body = await request.json()
59 logger.info("Received webhook", payload_keys=list(body.keys()))
60
61 # Parse and validate the payload
62 notification = WebhookPayload.parse(body)
63 logger.info("Parsed notification", type=type(notification).__name__)
64
65 # Format message for Zulip
66 topic, content = self.formatter.format_notification(notification)
67 logger.info("Formatted message", topic=topic)
68
69 # Send to Zulip
70 success = self.zulip_notifier.send_message(topic, content)
71
72 if success:
73 return {"status": "success", "message": "Notification sent to Zulip"}
74 else:
75 raise HTTPException(
76 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
77 detail="Failed to send notification to Zulip"
78 )
79
80 except ValueError as e:
81 logger.error("Invalid webhook payload", error=str(e))
82 raise HTTPException(
83 status_code=status.HTTP_400_BAD_REQUEST,
84 detail=f"Invalid payload format: {str(e)}"
85 )
86 except Exception as e:
87 logger.error("Webhook processing failed", error=str(e))
88 raise HTTPException(
89 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
90 detail="Internal server error"
91 )
92
93 def _setup_middleware(self):
94 """Setup middleware for logging and error handling."""
95
96 @self.app.middleware("http")
97 async def log_requests(request: Request, call_next):
98 """Log all requests."""
99 client_host = request.client.host if request.client else "unknown"
100 logger.info(
101 "Request received",
102 method=request.method,
103 url=str(request.url),
104 client=client_host
105 )
106
107 try:
108 response = await call_next(request)
109 logger.info(
110 "Request completed",
111 method=request.method,
112 url=str(request.url),
113 status_code=response.status_code
114 )
115 return response
116 except Exception as e:
117 logger.error(
118 "Request failed",
119 method=request.method,
120 url=str(request.url),
121 error=str(e)
122 )
123 raise
124
125 def get_ssl_context(self) -> ssl.SSLContext:
126 """Create SSL context for HTTPS and mutual TLS."""
127 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
128
129 # Load server certificate and key
130 cert_path = Path(self.server_config.cert_path) / self.server_config.domain
131 cert_file = cert_path / "fullchain.pem"
132 key_file = cert_path / "privkey.pem"
133
134 if not cert_file.exists() or not key_file.exists():
135 logger.error(
136 "SSL certificate files not found",
137 cert_file=str(cert_file),
138 key_file=str(key_file)
139 )
140 raise FileNotFoundError(f"SSL certificate files not found at {cert_path}")
141
142 context.load_cert_chain(str(cert_file), str(key_file))
143
144 # Configure mutual TLS if enabled
145 if self.server_config.enable_mtls:
146 # Use the hardcoded Netdata CA certificate
147 with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as ca_file:
148 ca_file.write(NETDATA_CA_CERT)
149 ca_file_path = ca_file.name
150
151 try:
152 context.load_verify_locations(ca_file_path)
153 context.verify_mode = ssl.CERT_REQUIRED
154 logger.info("Mutual TLS enabled with hardcoded Netdata CA certificate")
155 finally:
156 # Clean up the temporary file
157 Path(ca_file_path).unlink(missing_ok=True)
158 else:
159 context.verify_mode = ssl.CERT_NONE
160 logger.info("Mutual TLS disabled")
161
162 return context
163
164 def run(self):
165 """Run the webhook server with HTTPS and optional mutual TLS."""
166 try:
167 ssl_context = self.get_ssl_context()
168
169 logger.info(
170 "Starting Netdata Zulip webhook server",
171 host=self.server_config.host,
172 port=self.server_config.port,
173 domain=self.server_config.domain,
174 mtls_enabled=self.server_config.enable_mtls
175 )
176
177 uvicorn.run(
178 self.app,
179 host=self.server_config.host,
180 port=self.server_config.port,
181 ssl_context=ssl_context,
182 access_log=False, # We handle logging in middleware
183 )
184
185 except Exception as e:
186 logger.error("Failed to start server", error=str(e))
187 raise