···
-
"""Automated SSL certificate management using ACME protocol."""
-
from datetime import datetime, timezone
-
from pathlib import Path
-
from typing import Optional, Tuple
-
from acme import challenges, client, errors, messages
-
from cryptography import x509
-
from cryptography.hazmat.backends import default_backend
-
from cryptography.hazmat.primitives import hashes, serialization
-
from cryptography.hazmat.primitives.asymmetric import rsa
-
from cryptography.x509.oid import NameOID
-
from fastapi import FastAPI
-
from fastapi.responses import PlainTextResponse
-
logger = structlog.get_logger()
-
LETSENCRYPT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"
-
LETSENCRYPT_STAGING_URL = "https://acme-staging-v02.api.letsencrypt.org/directory"
-
class CertificateManager:
-
"""Manages SSL certificates using ACME protocol."""
-
"""Initialize certificate manager.
-
domain: Domain name for the certificate
-
email: Email for Let's Encrypt account
-
cert_dir: Directory to store certificates
-
staging: Use Let's Encrypt staging server
-
port: Port for HTTP-01 challenge server
-
self.cert_dir = Path(cert_dir)
-
self.cert_dir.mkdir(parents=True, exist_ok=True)
-
self.challenge_port = port
-
self.directory_url = LETSENCRYPT_STAGING_URL if staging else LETSENCRYPT_DIRECTORY_URL
-
self.account_key_path = self.cert_dir / "account_key.pem"
-
self.cert_path = self.cert_dir / f"{domain}_cert.pem"
-
self.key_path = self.cert_dir / f"{domain}_key.pem"
-
self.fullchain_path = self.cert_dir / f"{domain}_fullchain.pem"
-
# For HTTP-01 challenge
-
self.challenge_tokens = {}
-
self.challenge_server = None
-
self.challenge_thread = None
-
def _generate_private_key(self) -> rsa.RSAPrivateKey:
-
"""Generate a new RSA private key."""
-
return rsa.generate_private_key(
-
backend=default_backend()
-
def _get_or_create_account_key(self) -> jose.JWK:
-
"""Get existing account key or create a new one."""
-
if self.account_key_path.exists():
-
with open(self.account_key_path, 'rb') as f:
-
private_key = serialization.load_pem_private_key(
-
key_data, password=None, backend=default_backend()
-
private_key = self._generate_private_key()
-
key_pem = private_key.private_bytes(
-
encoding=serialization.Encoding.PEM,
-
format=serialization.PrivateFormat.TraditionalOpenSSL,
-
encryption_algorithm=serialization.NoEncryption()
-
with open(self.account_key_path, 'wb') as f:
-
logger.info("Created new account key", path=str(self.account_key_path))
-
return jose.JWK.load(private_key.private_bytes(
-
encoding=serialization.Encoding.PEM,
-
format=serialization.PrivateFormat.TraditionalOpenSSL,
-
encryption_algorithm=serialization.NoEncryption()
-
def _create_csr(self, private_key: rsa.RSAPrivateKey) -> bytes:
-
"""Create a Certificate Signing Request."""
-
csr = x509.CertificateSigningRequestBuilder().subject_name(
-
x509.NameAttribute(NameOID.COMMON_NAME, self.domain),
-
).sign(private_key, hashes.SHA256(), backend=default_backend())
-
return csr.public_bytes(serialization.Encoding.DER)
-
def _start_challenge_server(self):
-
"""Start HTTP server for ACME challenges."""
-
@app.get("/.well-known/acme-challenge/{token}")
-
async def acme_challenge(token: str):
-
"""Serve ACME challenge responses."""
-
if token in self.challenge_tokens:
-
logger.info("Serving ACME challenge", token=token)
-
return PlainTextResponse(self.challenge_tokens[token])
-
logger.warning("Unknown ACME challenge token", token=token)
-
return PlainTextResponse("Not found", status_code=404)
-
"""Run the challenge server in a thread."""
-
port=self.challenge_port,
-
logger.error("Challenge server error", error=str(e))
-
self.challenge_thread = threading.Thread(target=run_server, daemon=True)
-
self.challenge_thread.start()
-
# Give the server time to start
-
logger.info("Started ACME challenge server", port=self.challenge_port)
-
def _stop_challenge_server(self):
-
"""Stop the challenge server."""
-
if self.challenge_thread and self.challenge_thread.is_alive():
-
# The thread is daemon, so it will stop when the main process exits
-
logger.info("Challenge server will stop with main process")
-
def _perform_http01_challenge(
-
acme_client: client.ClientV2,
-
authz: messages.Authorization
-
"""Perform HTTP-01 challenge."""
-
# Find HTTP-01 challenge
-
for challenge in authz.body.challenges:
-
if isinstance(challenge.chall, challenges.HTTP01):
-
http_challenge = challenge
-
logger.error("No HTTP-01 challenge found")
-
# Prepare challenge response
-
response, validation = http_challenge.chall.response_and_validation(
-
# Store challenge token and response
-
self.challenge_tokens[http_challenge.chall.token.decode('utf-8')] = validation
-
"Prepared HTTP-01 challenge",
-
token=http_challenge.chall.token.decode('utf-8'),
-
# Notify ACME server that we're ready
-
acme_client.answer_challenge(http_challenge, response)
-
# Wait for challenge validation
-
for attempt in range(max_attempts):
-
authz, _ = acme_client.poll(authz)
-
if authz.body.status == messages.STATUS_VALID:
-
logger.info("Challenge validated successfully")
-
elif authz.body.status == messages.STATUS_INVALID:
-
logger.error("Challenge validation failed")
-
except errors.TimeoutError:
-
if attempt == max_attempts - 1:
-
logger.error("Challenge validation timeout")
-
def needs_renewal(self) -> bool:
-
"""Check if certificate needs renewal."""
-
if not self.cert_path.exists():
-
with open(self.cert_path, 'rb') as f:
-
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
-
# Renew if less than 30 days remaining
-
days_remaining = (cert.not_valid_after_utc -
-
datetime.now(timezone.utc)).days
-
if days_remaining < 30:
-
logger.info("Certificate needs renewal", days_remaining=days_remaining)
-
logger.info("Certificate still valid", days_remaining=days_remaining)
-
logger.error("Error checking certificate", error=str(e))
-
def obtain_certificate(self) -> Tuple[Path, Path, Path]:
-
"""Obtain or renew SSL certificate.
-
Tuple of (cert_path, key_path, fullchain_path)
-
if not self.needs_renewal():
-
logger.info("Certificate is still valid, skipping renewal")
-
return self.cert_path, self.key_path, self.fullchain_path
-
"Obtaining SSL certificate",
-
# Start challenge server
-
self._start_challenge_server()
-
# Get or create account key
-
account_key = self._get_or_create_account_key()
-
net = client.ClientNetwork(account_key)
-
directory = messages.Directory.from_json(
-
net.get(self.directory_url).json()
-
acme_client = client.ClientV2(directory, net=net)
-
# Register or get existing account
-
account = acme_client.new_account(
-
messages.NewRegistration.from_data(
-
terms_of_service_agreed=True
-
logger.info("Created new ACME account")
-
except errors.ConflictError:
-
# Account already exists
-
account = acme_client.query_registration(
-
messages.Registration(key=account_key.public_key())
-
logger.info("Using existing ACME account")
-
# Generate certificate private key
-
cert_key = self._generate_private_key()
-
csr = self._create_csr(cert_key)
-
order = acme_client.new_order(csr)
-
for authz in order.authorizations:
-
if not self._perform_http01_challenge(acme_client, authz):
-
raise Exception(f"Failed to complete challenge for {authz.body.identifier.value}")
-
order = acme_client.poll_and_finalize(order)
-
if order.fullchain_pem:
-
# Save certificate and key
-
with open(self.cert_path, 'w') as f:
-
f.write(order.fullchain_pem.split('\n\n')[0] + '\n')
-
with open(self.fullchain_path, 'w') as f:
-
f.write(order.fullchain_pem)
-
key_pem = cert_key.private_bytes(
-
encoding=serialization.Encoding.PEM,
-
format=serialization.PrivateFormat.TraditionalOpenSSL,
-
encryption_algorithm=serialization.NoEncryption()
-
with open(self.key_path, 'w') as f:
-
# Set proper permissions
-
os.chmod(self.key_path, 0o600)
-
os.chmod(self.cert_path, 0o644)
-
os.chmod(self.fullchain_path, 0o644)
-
"Certificate obtained successfully",
-
cert_path=str(self.cert_path),
-
key_path=str(self.key_path),
-
fullchain_path=str(self.fullchain_path)
-
return self.cert_path, self.key_path, self.fullchain_path
-
raise Exception("Failed to obtain certificate")
-
logger.error("Failed to obtain certificate", error=str(e))
-
# Clean up challenge tokens and stop server
-
self.challenge_tokens.clear()
-
self._stop_challenge_server()
-
def setup_auto_renewal(self, check_interval: int = 86400):
-
"""Setup automatic certificate renewal.
-
check_interval: Interval in seconds to check for renewal (default: 24 hours)
-
"""Background renewal loop."""
-
if self.needs_renewal():
-
logger.info("Certificate renewal needed")
-
self.obtain_certificate()
-
logger.debug("Certificate renewal not needed")
-
logger.error("Certificate renewal check failed", error=str(e))
-
time.sleep(check_interval)
-
renewal_thread = threading.Thread(target=renewal_loop, daemon=True)
-
logger.info("Started automatic certificate renewal", interval_hours=check_interval/3600)