"""Automated SSL certificate management using ACME protocol.""" import asyncio import json import os import socket import threading import time from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple import structlog from acme import challenges, client, errors, messages from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID from fastapi import FastAPI from fastapi.responses import PlainTextResponse import uvicorn import josepy as jose logger = structlog.get_logger() LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" LETSENCRYPT_STAGING_URL = "https://acme-staging-v02.api.letsencrypt.org/directory" class CertificateManager: """Manages SSL certificates using ACME protocol.""" def __init__( self, domain: str, email: str, cert_dir: Path, staging: bool = False, port: int = 80 ): """Initialize certificate manager. Args: domain: Domain name for the certificate email: Email for Let's Encrypt account cert_dir: Directory to store certificates staging: Use Let's Encrypt staging server port: Port for HTTP-01 challenge server """ self.domain = domain self.email = email self.cert_dir = Path(cert_dir) self.cert_dir.mkdir(parents=True, exist_ok=True) self.staging = staging self.challenge_port = port self.directory_url = LETSENCRYPT_STAGING_URL if staging else LETSENCRYPT_DIRECTORY_URL self.account_key_path = self.cert_dir / "account_key.pem" self.cert_path = self.cert_dir / f"{domain}_cert.pem" self.key_path = self.cert_dir / f"{domain}_key.pem" self.fullchain_path = self.cert_dir / f"{domain}_fullchain.pem" # For HTTP-01 challenge self.challenge_tokens = {} self.challenge_server = None self.challenge_thread = None def _generate_private_key(self) -> rsa.RSAPrivateKey: """Generate a new RSA private key.""" return rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) def _get_or_create_account_key(self) -> jose.JWK: """Get existing account key or create a new one.""" if self.account_key_path.exists(): with open(self.account_key_path, 'rb') as f: key_data = f.read() private_key = serialization.load_pem_private_key( key_data, password=None, backend=default_backend() ) else: private_key = self._generate_private_key() key_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) with open(self.account_key_path, 'wb') as f: f.write(key_pem) logger.info("Created new account key", path=str(self.account_key_path)) return jose.JWK.load(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() )) def _create_csr(self, private_key: rsa.RSAPrivateKey) -> bytes: """Create a Certificate Signing Request.""" csr = x509.CertificateSigningRequestBuilder().subject_name( x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, self.domain), ]) ).sign(private_key, hashes.SHA256(), backend=default_backend()) return csr.public_bytes(serialization.Encoding.DER) def _start_challenge_server(self): """Start HTTP server for ACME challenges.""" app = FastAPI() @app.get("/.well-known/acme-challenge/{token}") async def acme_challenge(token: str): """Serve ACME challenge responses.""" if token in self.challenge_tokens: logger.info("Serving ACME challenge", token=token) return PlainTextResponse(self.challenge_tokens[token]) logger.warning("Unknown ACME challenge token", token=token) return PlainTextResponse("Not found", status_code=404) def run_server(): """Run the challenge server in a thread.""" try: uvicorn.run( app, host="0.0.0.0", port=self.challenge_port, log_level="error" ) except Exception as e: logger.error("Challenge server error", error=str(e)) self.challenge_thread = threading.Thread(target=run_server, daemon=True) self.challenge_thread.start() # Give the server time to start time.sleep(2) logger.info("Started ACME challenge server", port=self.challenge_port) def _stop_challenge_server(self): """Stop the challenge server.""" if self.challenge_thread and self.challenge_thread.is_alive(): # The thread is daemon, so it will stop when the main process exits logger.info("Challenge server will stop with main process") def _perform_http01_challenge( self, acme_client: client.ClientV2, authz: messages.Authorization ) -> bool: """Perform HTTP-01 challenge.""" # Find HTTP-01 challenge http_challenge = None for challenge in authz.body.challenges: if isinstance(challenge.chall, challenges.HTTP01): http_challenge = challenge break if not http_challenge: logger.error("No HTTP-01 challenge found") return False # Prepare challenge response response, validation = http_challenge.chall.response_and_validation( acme_client.net.key ) # Store challenge token and response self.challenge_tokens[http_challenge.chall.token.decode('utf-8')] = validation logger.info( "Prepared HTTP-01 challenge", token=http_challenge.chall.token.decode('utf-8'), domain=self.domain ) # Notify ACME server that we're ready acme_client.answer_challenge(http_challenge, response) # Wait for challenge validation max_attempts = 30 for attempt in range(max_attempts): time.sleep(2) try: authz, _ = acme_client.poll(authz) if authz.body.status == messages.STATUS_VALID: logger.info("Challenge validated successfully") return True elif authz.body.status == messages.STATUS_INVALID: logger.error("Challenge validation failed") return False except errors.TimeoutError: if attempt == max_attempts - 1: logger.error("Challenge validation timeout") return False continue return False def needs_renewal(self) -> bool: """Check if certificate needs renewal.""" if not self.cert_path.exists(): return True try: with open(self.cert_path, 'rb') as f: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data, default_backend()) # Renew if less than 30 days remaining days_remaining = (cert.not_valid_after_utc - datetime.now(timezone.utc)).days if days_remaining < 30: logger.info("Certificate needs renewal", days_remaining=days_remaining) return True logger.info("Certificate still valid", days_remaining=days_remaining) return False except Exception as e: logger.error("Error checking certificate", error=str(e)) return True def obtain_certificate(self) -> Tuple[Path, Path, Path]: """Obtain or renew SSL certificate. Returns: Tuple of (cert_path, key_path, fullchain_path) """ if not self.needs_renewal(): logger.info("Certificate is still valid, skipping renewal") return self.cert_path, self.key_path, self.fullchain_path logger.info( "Obtaining SSL certificate", domain=self.domain, staging=self.staging ) try: # Start challenge server self._start_challenge_server() # Get or create account key account_key = self._get_or_create_account_key() # Create ACME client net = client.ClientNetwork(account_key) directory = messages.Directory.from_json( net.get(self.directory_url).json() ) acme_client = client.ClientV2(directory, net=net) # Register or get existing account try: account = acme_client.new_account( messages.NewRegistration.from_data( email=self.email, terms_of_service_agreed=True ) ) logger.info("Created new ACME account") except errors.ConflictError: # Account already exists account = acme_client.query_registration( messages.Registration(key=account_key.public_key()) ) logger.info("Using existing ACME account") # Generate certificate private key cert_key = self._generate_private_key() # Create CSR csr = self._create_csr(cert_key) # Request certificate order = acme_client.new_order(csr) # Complete challenges for authz in order.authorizations: if not self._perform_http01_challenge(acme_client, authz): raise Exception(f"Failed to complete challenge for {authz.body.identifier.value}") # Finalize order order = acme_client.poll_and_finalize(order) if order.fullchain_pem: # Save certificate and key with open(self.cert_path, 'w') as f: f.write(order.fullchain_pem.split('\n\n')[0] + '\n') with open(self.fullchain_path, 'w') as f: f.write(order.fullchain_pem) key_pem = cert_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ).decode('utf-8') with open(self.key_path, 'w') as f: f.write(key_pem) # Set proper permissions os.chmod(self.key_path, 0o600) os.chmod(self.cert_path, 0o644) os.chmod(self.fullchain_path, 0o644) logger.info( "Certificate obtained successfully", cert_path=str(self.cert_path), key_path=str(self.key_path), fullchain_path=str(self.fullchain_path) ) return self.cert_path, self.key_path, self.fullchain_path else: raise Exception("Failed to obtain certificate") except Exception as e: logger.error("Failed to obtain certificate", error=str(e)) raise finally: # Clean up challenge tokens and stop server self.challenge_tokens.clear() self._stop_challenge_server() def setup_auto_renewal(self, check_interval: int = 86400): """Setup automatic certificate renewal. Args: check_interval: Interval in seconds to check for renewal (default: 24 hours) """ def renewal_loop(): """Background renewal loop.""" while True: try: if self.needs_renewal(): logger.info("Certificate renewal needed") self.obtain_certificate() else: logger.debug("Certificate renewal not needed") except Exception as e: logger.error("Certificate renewal check failed", error=str(e)) time.sleep(check_interval) renewal_thread = threading.Thread(target=renewal_loop, daemon=True) renewal_thread.start() logger.info("Started automatic certificate renewal", interval_hours=check_interval/3600)