at master 5.9 kB view raw
1#!/usr/bin/env python3 2import time 3 4TOTAL_RETRIES = 20 5 6# BackoffTracker provides a robust system for handling test retries 7class BackoffTracker: 8 delay = 1 9 increment = 1 10 11 def handle_fail(self, retries, message) -> int: 12 assert retries < TOTAL_RETRIES, message 13 14 print(f"Retrying in {self.delay}s, {retries + 1}/{TOTAL_RETRIES}") 15 time.sleep(self.delay) 16 17 # Only increment after the first try 18 if retries == 0: 19 self.delay += self.increment 20 self.increment *= 2 21 22 return retries + 1 23 24 def protect(self, func): 25 def wrapper(*args, retries: int = 0, **kwargs): 26 try: 27 return func(*args, **kwargs) 28 except Exception as err: 29 retries = self.handle_fail(retries, err.args) 30 return wrapper(*args, retries=retries, **kwargs) 31 32 return wrapper 33 34 35backoff = BackoffTracker() 36 37def run(node, cmd, fail=False): 38 if fail: 39 return node.fail(cmd) 40 else: 41 return node.succeed(cmd) 42 43# Waits for the system to finish booting or switching configuration 44def wait_for_running(node): 45 node.succeed("systemctl is-system-running --wait") 46 47# On first switch, this will create a symlink to the current system so that we can 48# quickly switch between derivations 49def switch_to(node, name, fail=False) -> None: 50 root_specs = "/tmp/specialisation" 51 node.execute( 52 f"test -e {root_specs}" 53 f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}" 54 ) 55 56 switcher_path = ( 57 f"/run/current-system/specialisation/{name}/bin/switch-to-configuration" 58 ) 59 rc, _ = node.execute(f"test -e '{switcher_path}'") 60 if rc > 0: 61 switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration" 62 63 cmd = f"{switcher_path} test" 64 run(node, cmd, fail=fail) 65 if not fail: 66 wait_for_running(node) 67 68# Ensures the issuer of our cert matches the chain 69# and matches the issuer we expect it to be. 70# It's a good validation to ensure the cert.pem and fullchain.pem 71# are not still selfsigned after verification 72@backoff.protect 73def check_issuer(node, cert_name, issuer) -> None: 74 for fname in ("cert.pem", "fullchain.pem"): 75 actual_issuer = node.succeed( 76 f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}" 77 ).partition("=")[2] 78 assert ( 79 issuer.lower() in actual_issuer.lower() 80 ), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}" 81 82# Ensures the provided domain matches with the given cert 83def check_domain(node, cert_name, domain, fail=False) -> None: 84 cmd = f"openssl x509 -noout -checkhost '{domain}' -in /var/lib/acme/{cert_name}/cert.pem" 85 run(node, cmd, fail=fail) 86 87# Ensures the required values for OCSP stapling are present 88# Pebble doesn't provide a full OCSP responder, so just checks the URL 89def check_stapling(node, cert_name, ca_domain, fail=False): 90 rc, _ = node.execute( 91 f"openssl x509 -noout -ocsp_uri -in /var/lib/acme/{cert_name}/cert.pem" 92 f" | grep -i 'http://{ca_domain}:4002' 2>&1", 93 ) 94 assert rc == 0 or fail, "Failed to find OCSP URI in issued certificate" 95 run( 96 node, 97 f"openssl x509 -noout -ext tlsfeature -in /var/lib/acme/{cert_name}/cert.pem" 98 f" | grep -iv 'no extensions' 2>&1", 99 fail=fail, 100 ) 101 102# Checks the keyType by validating the number of bits 103def check_key_bits(node, cert_name, bits, fail=False): 104 run( 105 node, 106 f"openssl x509 -noout -text -in /var/lib/acme/{cert_name}/cert.pem" 107 f" | grep -i Public-Key | grep {bits} | tee /dev/stderr", 108 fail=fail, 109 ) 110 111# Ensure cert comes before chain in fullchain.pem 112def check_fullchain(node, cert_name): 113 cert_file = f"/var/lib/acme/{cert_name}/fullchain.pem" 114 num_certs = node.succeed(f"grep -o 'END CERTIFICATE' {cert_file}") 115 assert len(num_certs.strip().split("\n")) > 1, "Insufficient certs in fullchain.pem" 116 117 first_cert_data = node.succeed( 118 f"grep -m1 -B50 'END CERTIFICATE' {cert_file}" 119 " | openssl x509 -noout -text" 120 ) 121 for line in first_cert_data.lower().split("\n"): 122 if "dns:" in line: 123 print(f"First DNSName in fullchain.pem: {line}") 124 assert cert_name.lower() in line, f"{cert_name} not found in {line}" 125 return 126 127 assert False 128 129# Checks the permissions in the cert directories are as expected 130def check_permissions(node, cert_name, group): 131 stat = "stat -L -c '%a %U %G' " 132 node.succeed( 133 f"test $({stat} /var/lib/acme/{cert_name}/*.pem" 134 f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0" 135 ) 136 node.execute(f"ls -lahR /var/lib/acme/.lego/{cert_name}/* > /dev/stderr") 137 node.succeed( 138 f"test $({stat} /var/lib/acme/.lego/{cert_name}/*/{cert_name}*" 139 f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0" 140 ) 141 node.succeed( 142 f"test $({stat} /var/lib/acme/{cert_name}" 143 f" | tee /dev/stderr | grep -v '750 acme {group}' | wc -l) -eq 0" 144 ) 145 node.succeed( 146 f"test $(find /var/lib/acme/.lego/accounts -type f -exec {stat} {{}} \\;" 147 f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0" 148 ) 149 150 151@backoff.protect 152def download_ca_certs(node, ca_domain): 153 node.succeed(f"curl https://{ca_domain}:15000/roots/0 > /tmp/ca.crt") 154 node.succeed(f"curl https://{ca_domain}:15000/intermediate-keys/0 >> /tmp/ca.crt") 155 156 157@backoff.protect 158def check_connection(node, domain, fail=False, minica=False): 159 cafile = "/tmp/ca.crt" 160 if minica: 161 cafile = "/var/lib/acme/.minica/cert.pem" 162 run(node, 163 f"openssl s_client -brief -CAfile {cafile}" 164 f" -verify 2 -verify_return_error -verify_hostname {domain}" 165 f" -servername {domain} -connect {domain}:443 < /dev/null", 166 fail=fail, 167 )