Netdata.cloud bot for Zulip
1"""FastAPI webhook server for receiving Netdata notifications."""
2
3import ssl
4import tempfile
5from pathlib import Path
6from typing import Dict, Any
7
8import structlog
9import uvicorn
10from fastapi import FastAPI, HTTPException, Request, status
11from fastapi.responses import JSONResponse
12
13from .cert_manager import CertificateManager
14from .formatter import ZulipMessageFormatter
15from .models import WebhookPayload, ZulipConfig, ServerConfig
16from .netdata_ca import NETDATA_CA_CERT
17from .zulip_client import ZulipNotifier
18
19logger = structlog.get_logger()
20
21
22class NetdataWebhookServer:
23 """FastAPI server for handling Netdata Cloud webhooks."""
24
25 def __init__(self, zulip_config: ZulipConfig, server_config: ServerConfig):
26 """Initialize the webhook server."""
27 self.app = FastAPI(
28 title="Netdata Zulip Bot",
29 description="Webhook service for Netdata Cloud notifications",
30 version="0.1.0"
31 )
32 self.zulip_config = zulip_config
33 self.server_config = server_config
34 self.formatter = ZulipMessageFormatter()
35 self.cert_manager = None
36
37 # Initialize certificate manager if auto-cert is enabled
38 if self.server_config.auto_cert:
39 self.cert_manager = CertificateManager(
40 domain=self.server_config.domain,
41 email=self.server_config.cert_email,
42 cert_dir=Path(self.server_config.cert_path),
43 staging=self.server_config.cert_staging,
44 port=self.server_config.acme_port
45 )
46
47 # Initialize Zulip client
48 try:
49 self.zulip_notifier = ZulipNotifier(zulip_config)
50 except Exception as e:
51 logger.error("Failed to initialize Zulip client", error=str(e))
52 raise
53
54 self._setup_routes()
55 self._setup_middleware()
56
57 def _setup_routes(self):
58 """Setup FastAPI routes."""
59
60 @self.app.get("/health")
61 async def health_check():
62 """Health check endpoint."""
63 return {"status": "healthy", "service": "netdata-zulip-bot"}
64
65 @self.app.post("/webhook/netdata")
66 async def netdata_webhook(request: Request):
67 """Handle Netdata Cloud webhook notifications."""
68 try:
69 # Get raw JSON data
70 body = await request.json()
71 logger.info("Received webhook", payload_keys=list(body.keys()))
72
73 # Parse and validate the payload
74 notification = WebhookPayload.parse(body)
75 logger.info("Parsed notification", type=type(notification).__name__)
76
77 # Format message for Zulip
78 topic, content = self.formatter.format_notification(notification)
79 logger.info("Formatted message", topic=topic)
80
81 # Send to Zulip
82 success = self.zulip_notifier.send_message(topic, content)
83
84 if success:
85 return {"status": "success", "message": "Notification sent to Zulip"}
86 else:
87 raise HTTPException(
88 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
89 detail="Failed to send notification to Zulip"
90 )
91
92 except ValueError as e:
93 logger.error("Invalid webhook payload", error=str(e))
94 raise HTTPException(
95 status_code=status.HTTP_400_BAD_REQUEST,
96 detail=f"Invalid payload format: {str(e)}"
97 )
98 except Exception as e:
99 logger.error("Webhook processing failed", error=str(e))
100 raise HTTPException(
101 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
102 detail="Internal server error"
103 )
104
105 def _setup_middleware(self):
106 """Setup middleware for logging and error handling."""
107
108 @self.app.middleware("http")
109 async def log_requests(request: Request, call_next):
110 """Log all requests."""
111 client_host = request.client.host if request.client else "unknown"
112 logger.info(
113 "Request received",
114 method=request.method,
115 url=str(request.url),
116 client=client_host
117 )
118
119 try:
120 response = await call_next(request)
121 logger.info(
122 "Request completed",
123 method=request.method,
124 url=str(request.url),
125 status_code=response.status_code
126 )
127 return response
128 except Exception as e:
129 logger.error(
130 "Request failed",
131 method=request.method,
132 url=str(request.url),
133 error=str(e)
134 )
135 raise
136
137 def get_ssl_context(self) -> ssl.SSLContext:
138 """Create SSL context for HTTPS and mutual TLS."""
139 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
140
141 # Get certificate paths
142 if self.cert_manager and self.server_config.auto_cert:
143 # Use automated certificates
144 try:
145 cert_file, key_file, fullchain_file = self.cert_manager.obtain_certificate()
146 logger.info(
147 "Using automated SSL certificate",
148 cert_file=str(cert_file),
149 key_file=str(key_file)
150 )
151 except Exception as e:
152 logger.error("Failed to obtain automated certificate", error=str(e))
153 raise
154 else:
155 # Use manually provided certificates
156 cert_path = Path(self.server_config.cert_path) / self.server_config.domain
157 fullchain_file = cert_path / "fullchain.pem"
158 key_file = cert_path / "privkey.pem"
159
160 if not fullchain_file.exists() or not key_file.exists():
161 logger.error(
162 "SSL certificate files not found",
163 cert_file=str(fullchain_file),
164 key_file=str(key_file)
165 )
166 raise FileNotFoundError(f"SSL certificate files not found at {cert_path}")
167
168 context.load_cert_chain(str(fullchain_file), str(key_file))
169
170 # Configure mutual TLS if enabled
171 if self.server_config.enable_mtls:
172 # Use the hardcoded Netdata CA certificate
173 with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as ca_file:
174 ca_file.write(NETDATA_CA_CERT)
175 ca_file_path = ca_file.name
176
177 try:
178 context.load_verify_locations(ca_file_path)
179 context.verify_mode = ssl.CERT_REQUIRED
180 logger.info("Mutual TLS enabled with hardcoded Netdata CA certificate")
181 finally:
182 # Clean up the temporary file
183 Path(ca_file_path).unlink(missing_ok=True)
184 else:
185 context.verify_mode = ssl.CERT_NONE
186 logger.info("Mutual TLS disabled")
187
188 return context
189
190 def run(self):
191 """Run the webhook server with HTTPS and optional mutual TLS."""
192 try:
193 # Setup automatic certificate renewal if enabled
194 if self.cert_manager and self.server_config.auto_cert:
195 self.cert_manager.setup_auto_renewal()
196 logger.info("Automatic certificate renewal enabled")
197
198 ssl_context = self.get_ssl_context()
199
200 logger.info(
201 "Starting Netdata Zulip webhook server",
202 host=self.server_config.host,
203 port=self.server_config.port,
204 domain=self.server_config.domain,
205 mtls_enabled=self.server_config.enable_mtls,
206 auto_cert=self.server_config.auto_cert
207 )
208
209 uvicorn.run(
210 self.app,
211 host=self.server_config.host,
212 port=self.server_config.port,
213 ssl_context=ssl_context,
214 access_log=False, # We handle logging in middleware
215 )
216
217 except Exception as e:
218 logger.error("Failed to start server", error=str(e))
219 raise