Netdata.cloud bot for Zulip
1"""Automated SSL certificate management using ACME protocol.""" 2 3import asyncio 4import json 5import os 6import socket 7import threading 8import time 9from datetime import datetime, timezone 10from pathlib import Path 11from typing import Optional, Tuple 12 13import structlog 14from acme import challenges, client, errors, messages 15from cryptography import x509 16from cryptography.hazmat.backends import default_backend 17from cryptography.hazmat.primitives import hashes, serialization 18from cryptography.hazmat.primitives.asymmetric import rsa 19from cryptography.x509.oid import NameOID 20from fastapi import FastAPI 21from fastapi.responses import PlainTextResponse 22import uvicorn 23import josepy as jose 24 25logger = structlog.get_logger() 26 27LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" 28LETSENCRYPT_STAGING_URL = "https://acme-staging-v02.api.letsencrypt.org/directory" 29 30 31class CertificateManager: 32 """Manages SSL certificates using ACME protocol.""" 33 34 def __init__( 35 self, 36 domain: str, 37 email: str, 38 cert_dir: Path, 39 staging: bool = False, 40 port: int = 80 41 ): 42 """Initialize certificate manager. 43 44 Args: 45 domain: Domain name for the certificate 46 email: Email for Let's Encrypt account 47 cert_dir: Directory to store certificates 48 staging: Use Let's Encrypt staging server 49 port: Port for HTTP-01 challenge server 50 """ 51 self.domain = domain 52 self.email = email 53 self.cert_dir = Path(cert_dir) 54 self.cert_dir.mkdir(parents=True, exist_ok=True) 55 self.staging = staging 56 self.challenge_port = port 57 58 self.directory_url = LETSENCRYPT_STAGING_URL if staging else LETSENCRYPT_DIRECTORY_URL 59 self.account_key_path = self.cert_dir / "account_key.pem" 60 self.cert_path = self.cert_dir / f"{domain}_cert.pem" 61 self.key_path = self.cert_dir / f"{domain}_key.pem" 62 self.fullchain_path = self.cert_dir / f"{domain}_fullchain.pem" 63 64 # For HTTP-01 challenge 65 self.challenge_tokens = {} 66 self.challenge_server = None 67 self.challenge_thread = None 68 69 def _generate_private_key(self) -> rsa.RSAPrivateKey: 70 """Generate a new RSA private key.""" 71 return rsa.generate_private_key( 72 public_exponent=65537, 73 key_size=2048, 74 backend=default_backend() 75 ) 76 77 def _get_or_create_account_key(self) -> jose.JWK: 78 """Get existing account key or create a new one.""" 79 if self.account_key_path.exists(): 80 with open(self.account_key_path, 'rb') as f: 81 key_data = f.read() 82 private_key = serialization.load_pem_private_key( 83 key_data, password=None, backend=default_backend() 84 ) 85 else: 86 private_key = self._generate_private_key() 87 key_pem = private_key.private_bytes( 88 encoding=serialization.Encoding.PEM, 89 format=serialization.PrivateFormat.TraditionalOpenSSL, 90 encryption_algorithm=serialization.NoEncryption() 91 ) 92 with open(self.account_key_path, 'wb') as f: 93 f.write(key_pem) 94 logger.info("Created new account key", path=str(self.account_key_path)) 95 96 return jose.JWK.load(private_key.private_bytes( 97 encoding=serialization.Encoding.PEM, 98 format=serialization.PrivateFormat.TraditionalOpenSSL, 99 encryption_algorithm=serialization.NoEncryption() 100 )) 101 102 def _create_csr(self, private_key: rsa.RSAPrivateKey) -> bytes: 103 """Create a Certificate Signing Request.""" 104 csr = x509.CertificateSigningRequestBuilder().subject_name( 105 x509.Name([ 106 x509.NameAttribute(NameOID.COMMON_NAME, self.domain), 107 ]) 108 ).sign(private_key, hashes.SHA256(), backend=default_backend()) 109 110 return csr.public_bytes(serialization.Encoding.DER) 111 112 def _start_challenge_server(self): 113 """Start HTTP server for ACME challenges.""" 114 app = FastAPI() 115 116 @app.get("/.well-known/acme-challenge/{token}") 117 async def acme_challenge(token: str): 118 """Serve ACME challenge responses.""" 119 if token in self.challenge_tokens: 120 logger.info("Serving ACME challenge", token=token) 121 return PlainTextResponse(self.challenge_tokens[token]) 122 logger.warning("Unknown ACME challenge token", token=token) 123 return PlainTextResponse("Not found", status_code=404) 124 125 def run_server(): 126 """Run the challenge server in a thread.""" 127 try: 128 uvicorn.run( 129 app, 130 host="0.0.0.0", 131 port=self.challenge_port, 132 log_level="error" 133 ) 134 except Exception as e: 135 logger.error("Challenge server error", error=str(e)) 136 137 self.challenge_thread = threading.Thread(target=run_server, daemon=True) 138 self.challenge_thread.start() 139 140 # Give the server time to start 141 time.sleep(2) 142 logger.info("Started ACME challenge server", port=self.challenge_port) 143 144 def _stop_challenge_server(self): 145 """Stop the challenge server.""" 146 if self.challenge_thread and self.challenge_thread.is_alive(): 147 # The thread is daemon, so it will stop when the main process exits 148 logger.info("Challenge server will stop with main process") 149 150 def _perform_http01_challenge( 151 self, 152 acme_client: client.ClientV2, 153 authz: messages.Authorization 154 ) -> bool: 155 """Perform HTTP-01 challenge.""" 156 # Find HTTP-01 challenge 157 http_challenge = None 158 for challenge in authz.body.challenges: 159 if isinstance(challenge.chall, challenges.HTTP01): 160 http_challenge = challenge 161 break 162 163 if not http_challenge: 164 logger.error("No HTTP-01 challenge found") 165 return False 166 167 # Prepare challenge response 168 response, validation = http_challenge.chall.response_and_validation( 169 acme_client.net.key 170 ) 171 172 # Store challenge token and response 173 self.challenge_tokens[http_challenge.chall.token.decode('utf-8')] = validation 174 175 logger.info( 176 "Prepared HTTP-01 challenge", 177 token=http_challenge.chall.token.decode('utf-8'), 178 domain=self.domain 179 ) 180 181 # Notify ACME server that we're ready 182 acme_client.answer_challenge(http_challenge, response) 183 184 # Wait for challenge validation 185 max_attempts = 30 186 for attempt in range(max_attempts): 187 time.sleep(2) 188 try: 189 authz, _ = acme_client.poll(authz) 190 if authz.body.status == messages.STATUS_VALID: 191 logger.info("Challenge validated successfully") 192 return True 193 elif authz.body.status == messages.STATUS_INVALID: 194 logger.error("Challenge validation failed") 195 return False 196 except errors.TimeoutError: 197 if attempt == max_attempts - 1: 198 logger.error("Challenge validation timeout") 199 return False 200 continue 201 202 return False 203 204 def needs_renewal(self) -> bool: 205 """Check if certificate needs renewal.""" 206 if not self.cert_path.exists(): 207 return True 208 209 try: 210 with open(self.cert_path, 'rb') as f: 211 cert_data = f.read() 212 cert = x509.load_pem_x509_certificate(cert_data, default_backend()) 213 214 # Renew if less than 30 days remaining 215 days_remaining = (cert.not_valid_after_utc - 216 datetime.now(timezone.utc)).days 217 218 if days_remaining < 30: 219 logger.info("Certificate needs renewal", days_remaining=days_remaining) 220 return True 221 222 logger.info("Certificate still valid", days_remaining=days_remaining) 223 return False 224 225 except Exception as e: 226 logger.error("Error checking certificate", error=str(e)) 227 return True 228 229 def obtain_certificate(self) -> Tuple[Path, Path, Path]: 230 """Obtain or renew SSL certificate. 231 232 Returns: 233 Tuple of (cert_path, key_path, fullchain_path) 234 """ 235 if not self.needs_renewal(): 236 logger.info("Certificate is still valid, skipping renewal") 237 return self.cert_path, self.key_path, self.fullchain_path 238 239 logger.info( 240 "Obtaining SSL certificate", 241 domain=self.domain, 242 staging=self.staging 243 ) 244 245 try: 246 # Start challenge server 247 self._start_challenge_server() 248 249 # Get or create account key 250 account_key = self._get_or_create_account_key() 251 252 # Create ACME client 253 net = client.ClientNetwork(account_key) 254 directory = messages.Directory.from_json( 255 net.get(self.directory_url).json() 256 ) 257 acme_client = client.ClientV2(directory, net=net) 258 259 # Register or get existing account 260 try: 261 account = acme_client.new_account( 262 messages.NewRegistration.from_data( 263 email=self.email, 264 terms_of_service_agreed=True 265 ) 266 ) 267 logger.info("Created new ACME account") 268 except errors.ConflictError: 269 # Account already exists 270 account = acme_client.query_registration( 271 messages.Registration(key=account_key.public_key()) 272 ) 273 logger.info("Using existing ACME account") 274 275 # Generate certificate private key 276 cert_key = self._generate_private_key() 277 278 # Create CSR 279 csr = self._create_csr(cert_key) 280 281 # Request certificate 282 order = acme_client.new_order(csr) 283 284 # Complete challenges 285 for authz in order.authorizations: 286 if not self._perform_http01_challenge(acme_client, authz): 287 raise Exception(f"Failed to complete challenge for {authz.body.identifier.value}") 288 289 # Finalize order 290 order = acme_client.poll_and_finalize(order) 291 292 if order.fullchain_pem: 293 # Save certificate and key 294 with open(self.cert_path, 'w') as f: 295 f.write(order.fullchain_pem.split('\n\n')[0] + '\n') 296 297 with open(self.fullchain_path, 'w') as f: 298 f.write(order.fullchain_pem) 299 300 key_pem = cert_key.private_bytes( 301 encoding=serialization.Encoding.PEM, 302 format=serialization.PrivateFormat.TraditionalOpenSSL, 303 encryption_algorithm=serialization.NoEncryption() 304 ).decode('utf-8') 305 306 with open(self.key_path, 'w') as f: 307 f.write(key_pem) 308 309 # Set proper permissions 310 os.chmod(self.key_path, 0o600) 311 os.chmod(self.cert_path, 0o644) 312 os.chmod(self.fullchain_path, 0o644) 313 314 logger.info( 315 "Certificate obtained successfully", 316 cert_path=str(self.cert_path), 317 key_path=str(self.key_path), 318 fullchain_path=str(self.fullchain_path) 319 ) 320 321 return self.cert_path, self.key_path, self.fullchain_path 322 else: 323 raise Exception("Failed to obtain certificate") 324 325 except Exception as e: 326 logger.error("Failed to obtain certificate", error=str(e)) 327 raise 328 finally: 329 # Clean up challenge tokens and stop server 330 self.challenge_tokens.clear() 331 self._stop_challenge_server() 332 333 def setup_auto_renewal(self, check_interval: int = 86400): 334 """Setup automatic certificate renewal. 335 336 Args: 337 check_interval: Interval in seconds to check for renewal (default: 24 hours) 338 """ 339 def renewal_loop(): 340 """Background renewal loop.""" 341 while True: 342 try: 343 if self.needs_renewal(): 344 logger.info("Certificate renewal needed") 345 self.obtain_certificate() 346 else: 347 logger.debug("Certificate renewal not needed") 348 except Exception as e: 349 logger.error("Certificate renewal check failed", error=str(e)) 350 351 time.sleep(check_interval) 352 353 renewal_thread = threading.Thread(target=renewal_loop, daemon=True) 354 renewal_thread.start() 355 logger.info("Started automatic certificate renewal", interval_hours=check_interval/3600)