Netdata.cloud bot for Zulip
1"""Automated SSL certificate management using ACME protocol."""
2
3import asyncio
4import json
5import os
6import socket
7import threading
8import time
9from datetime import datetime, timezone
10from pathlib import Path
11from typing import Optional, Tuple
12
13import structlog
14from acme import challenges, client, errors, messages
15from cryptography import x509
16from cryptography.hazmat.backends import default_backend
17from cryptography.hazmat.primitives import hashes, serialization
18from cryptography.hazmat.primitives.asymmetric import rsa
19from cryptography.x509.oid import NameOID
20from fastapi import FastAPI
21from fastapi.responses import PlainTextResponse
22import uvicorn
23import josepy as jose
24
25logger = structlog.get_logger()
26
27LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
28LETSENCRYPT_STAGING_URL = "https://acme-staging-v02.api.letsencrypt.org/directory"
29
30
31class CertificateManager:
32 """Manages SSL certificates using ACME protocol."""
33
34 def __init__(
35 self,
36 domain: str,
37 email: str,
38 cert_dir: Path,
39 staging: bool = False,
40 port: int = 80
41 ):
42 """Initialize certificate manager.
43
44 Args:
45 domain: Domain name for the certificate
46 email: Email for Let's Encrypt account
47 cert_dir: Directory to store certificates
48 staging: Use Let's Encrypt staging server
49 port: Port for HTTP-01 challenge server
50 """
51 self.domain = domain
52 self.email = email
53 self.cert_dir = Path(cert_dir)
54 self.cert_dir.mkdir(parents=True, exist_ok=True)
55 self.staging = staging
56 self.challenge_port = port
57
58 self.directory_url = LETSENCRYPT_STAGING_URL if staging else LETSENCRYPT_DIRECTORY_URL
59 self.account_key_path = self.cert_dir / "account_key.pem"
60 self.cert_path = self.cert_dir / f"{domain}_cert.pem"
61 self.key_path = self.cert_dir / f"{domain}_key.pem"
62 self.fullchain_path = self.cert_dir / f"{domain}_fullchain.pem"
63
64 # For HTTP-01 challenge
65 self.challenge_tokens = {}
66 self.challenge_server = None
67 self.challenge_thread = None
68
69 def _generate_private_key(self) -> rsa.RSAPrivateKey:
70 """Generate a new RSA private key."""
71 return rsa.generate_private_key(
72 public_exponent=65537,
73 key_size=2048,
74 backend=default_backend()
75 )
76
77 def _get_or_create_account_key(self) -> jose.JWK:
78 """Get existing account key or create a new one."""
79 if self.account_key_path.exists():
80 with open(self.account_key_path, 'rb') as f:
81 key_data = f.read()
82 private_key = serialization.load_pem_private_key(
83 key_data, password=None, backend=default_backend()
84 )
85 else:
86 private_key = self._generate_private_key()
87 key_pem = private_key.private_bytes(
88 encoding=serialization.Encoding.PEM,
89 format=serialization.PrivateFormat.TraditionalOpenSSL,
90 encryption_algorithm=serialization.NoEncryption()
91 )
92 with open(self.account_key_path, 'wb') as f:
93 f.write(key_pem)
94 logger.info("Created new account key", path=str(self.account_key_path))
95
96 return jose.JWK.load(private_key.private_bytes(
97 encoding=serialization.Encoding.PEM,
98 format=serialization.PrivateFormat.TraditionalOpenSSL,
99 encryption_algorithm=serialization.NoEncryption()
100 ))
101
102 def _create_csr(self, private_key: rsa.RSAPrivateKey) -> bytes:
103 """Create a Certificate Signing Request."""
104 csr = x509.CertificateSigningRequestBuilder().subject_name(
105 x509.Name([
106 x509.NameAttribute(NameOID.COMMON_NAME, self.domain),
107 ])
108 ).sign(private_key, hashes.SHA256(), backend=default_backend())
109
110 return csr.public_bytes(serialization.Encoding.DER)
111
112 def _start_challenge_server(self):
113 """Start HTTP server for ACME challenges."""
114 app = FastAPI()
115
116 @app.get("/.well-known/acme-challenge/{token}")
117 async def acme_challenge(token: str):
118 """Serve ACME challenge responses."""
119 if token in self.challenge_tokens:
120 logger.info("Serving ACME challenge", token=token)
121 return PlainTextResponse(self.challenge_tokens[token])
122 logger.warning("Unknown ACME challenge token", token=token)
123 return PlainTextResponse("Not found", status_code=404)
124
125 def run_server():
126 """Run the challenge server in a thread."""
127 try:
128 uvicorn.run(
129 app,
130 host="0.0.0.0",
131 port=self.challenge_port,
132 log_level="error"
133 )
134 except Exception as e:
135 logger.error("Challenge server error", error=str(e))
136
137 self.challenge_thread = threading.Thread(target=run_server, daemon=True)
138 self.challenge_thread.start()
139
140 # Give the server time to start
141 time.sleep(2)
142 logger.info("Started ACME challenge server", port=self.challenge_port)
143
144 def _stop_challenge_server(self):
145 """Stop the challenge server."""
146 if self.challenge_thread and self.challenge_thread.is_alive():
147 # The thread is daemon, so it will stop when the main process exits
148 logger.info("Challenge server will stop with main process")
149
150 def _perform_http01_challenge(
151 self,
152 acme_client: client.ClientV2,
153 authz: messages.Authorization
154 ) -> bool:
155 """Perform HTTP-01 challenge."""
156 # Find HTTP-01 challenge
157 http_challenge = None
158 for challenge in authz.body.challenges:
159 if isinstance(challenge.chall, challenges.HTTP01):
160 http_challenge = challenge
161 break
162
163 if not http_challenge:
164 logger.error("No HTTP-01 challenge found")
165 return False
166
167 # Prepare challenge response
168 response, validation = http_challenge.chall.response_and_validation(
169 acme_client.net.key
170 )
171
172 # Store challenge token and response
173 self.challenge_tokens[http_challenge.chall.token.decode('utf-8')] = validation
174
175 logger.info(
176 "Prepared HTTP-01 challenge",
177 token=http_challenge.chall.token.decode('utf-8'),
178 domain=self.domain
179 )
180
181 # Notify ACME server that we're ready
182 acme_client.answer_challenge(http_challenge, response)
183
184 # Wait for challenge validation
185 max_attempts = 30
186 for attempt in range(max_attempts):
187 time.sleep(2)
188 try:
189 authz, _ = acme_client.poll(authz)
190 if authz.body.status == messages.STATUS_VALID:
191 logger.info("Challenge validated successfully")
192 return True
193 elif authz.body.status == messages.STATUS_INVALID:
194 logger.error("Challenge validation failed")
195 return False
196 except errors.TimeoutError:
197 if attempt == max_attempts - 1:
198 logger.error("Challenge validation timeout")
199 return False
200 continue
201
202 return False
203
204 def needs_renewal(self) -> bool:
205 """Check if certificate needs renewal."""
206 if not self.cert_path.exists():
207 return True
208
209 try:
210 with open(self.cert_path, 'rb') as f:
211 cert_data = f.read()
212 cert = x509.load_pem_x509_certificate(cert_data, default_backend())
213
214 # Renew if less than 30 days remaining
215 days_remaining = (cert.not_valid_after_utc -
216 datetime.now(timezone.utc)).days
217
218 if days_remaining < 30:
219 logger.info("Certificate needs renewal", days_remaining=days_remaining)
220 return True
221
222 logger.info("Certificate still valid", days_remaining=days_remaining)
223 return False
224
225 except Exception as e:
226 logger.error("Error checking certificate", error=str(e))
227 return True
228
229 def obtain_certificate(self) -> Tuple[Path, Path, Path]:
230 """Obtain or renew SSL certificate.
231
232 Returns:
233 Tuple of (cert_path, key_path, fullchain_path)
234 """
235 if not self.needs_renewal():
236 logger.info("Certificate is still valid, skipping renewal")
237 return self.cert_path, self.key_path, self.fullchain_path
238
239 logger.info(
240 "Obtaining SSL certificate",
241 domain=self.domain,
242 staging=self.staging
243 )
244
245 try:
246 # Start challenge server
247 self._start_challenge_server()
248
249 # Get or create account key
250 account_key = self._get_or_create_account_key()
251
252 # Create ACME client
253 net = client.ClientNetwork(account_key)
254 directory = messages.Directory.from_json(
255 net.get(self.directory_url).json()
256 )
257 acme_client = client.ClientV2(directory, net=net)
258
259 # Register or get existing account
260 try:
261 account = acme_client.new_account(
262 messages.NewRegistration.from_data(
263 email=self.email,
264 terms_of_service_agreed=True
265 )
266 )
267 logger.info("Created new ACME account")
268 except errors.ConflictError:
269 # Account already exists
270 account = acme_client.query_registration(
271 messages.Registration(key=account_key.public_key())
272 )
273 logger.info("Using existing ACME account")
274
275 # Generate certificate private key
276 cert_key = self._generate_private_key()
277
278 # Create CSR
279 csr = self._create_csr(cert_key)
280
281 # Request certificate
282 order = acme_client.new_order(csr)
283
284 # Complete challenges
285 for authz in order.authorizations:
286 if not self._perform_http01_challenge(acme_client, authz):
287 raise Exception(f"Failed to complete challenge for {authz.body.identifier.value}")
288
289 # Finalize order
290 order = acme_client.poll_and_finalize(order)
291
292 if order.fullchain_pem:
293 # Save certificate and key
294 with open(self.cert_path, 'w') as f:
295 f.write(order.fullchain_pem.split('\n\n')[0] + '\n')
296
297 with open(self.fullchain_path, 'w') as f:
298 f.write(order.fullchain_pem)
299
300 key_pem = cert_key.private_bytes(
301 encoding=serialization.Encoding.PEM,
302 format=serialization.PrivateFormat.TraditionalOpenSSL,
303 encryption_algorithm=serialization.NoEncryption()
304 ).decode('utf-8')
305
306 with open(self.key_path, 'w') as f:
307 f.write(key_pem)
308
309 # Set proper permissions
310 os.chmod(self.key_path, 0o600)
311 os.chmod(self.cert_path, 0o644)
312 os.chmod(self.fullchain_path, 0o644)
313
314 logger.info(
315 "Certificate obtained successfully",
316 cert_path=str(self.cert_path),
317 key_path=str(self.key_path),
318 fullchain_path=str(self.fullchain_path)
319 )
320
321 return self.cert_path, self.key_path, self.fullchain_path
322 else:
323 raise Exception("Failed to obtain certificate")
324
325 except Exception as e:
326 logger.error("Failed to obtain certificate", error=str(e))
327 raise
328 finally:
329 # Clean up challenge tokens and stop server
330 self.challenge_tokens.clear()
331 self._stop_challenge_server()
332
333 def setup_auto_renewal(self, check_interval: int = 86400):
334 """Setup automatic certificate renewal.
335
336 Args:
337 check_interval: Interval in seconds to check for renewal (default: 24 hours)
338 """
339 def renewal_loop():
340 """Background renewal loop."""
341 while True:
342 try:
343 if self.needs_renewal():
344 logger.info("Certificate renewal needed")
345 self.obtain_certificate()
346 else:
347 logger.debug("Certificate renewal not needed")
348 except Exception as e:
349 logger.error("Certificate renewal check failed", error=str(e))
350
351 time.sleep(check_interval)
352
353 renewal_thread = threading.Thread(target=renewal_loop, daemon=True)
354 renewal_thread.start()
355 logger.info("Started automatic certificate renewal", interval_hours=check_interval/3600)