1#!/usr/bin/env python3
2import time
3
4TOTAL_RETRIES = 20
5
6# BackoffTracker provides a robust system for handling test retries
7class BackoffTracker:
8 delay = 1
9 increment = 1
10
11 def handle_fail(self, retries, message) -> int:
12 assert retries < TOTAL_RETRIES, message
13
14 print(f"Retrying in {self.delay}s, {retries + 1}/{TOTAL_RETRIES}")
15 time.sleep(self.delay)
16
17 # Only increment after the first try
18 if retries == 0:
19 self.delay += self.increment
20 self.increment *= 2
21
22 return retries + 1
23
24 def protect(self, func):
25 def wrapper(*args, retries: int = 0, **kwargs):
26 try:
27 return func(*args, **kwargs)
28 except Exception as err:
29 retries = self.handle_fail(retries, err.args)
30 return wrapper(*args, retries=retries, **kwargs)
31
32 return wrapper
33
34
35backoff = BackoffTracker()
36
37def run(node, cmd, fail=False):
38 if fail:
39 return node.fail(cmd)
40 else:
41 return node.succeed(cmd)
42
43# Waits for the system to finish booting or switching configuration
44def wait_for_running(node):
45 node.succeed("systemctl is-system-running --wait")
46
47# On first switch, this will create a symlink to the current system so that we can
48# quickly switch between derivations
49def switch_to(node, name, fail=False) -> None:
50 root_specs = "/tmp/specialisation"
51 node.execute(
52 f"test -e {root_specs}"
53 f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}"
54 )
55
56 switcher_path = (
57 f"/run/current-system/specialisation/{name}/bin/switch-to-configuration"
58 )
59 rc, _ = node.execute(f"test -e '{switcher_path}'")
60 if rc > 0:
61 switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration"
62
63 cmd = f"{switcher_path} test"
64 run(node, cmd, fail=fail)
65 if not fail:
66 wait_for_running(node)
67
68# Ensures the issuer of our cert matches the chain
69# and matches the issuer we expect it to be.
70# It's a good validation to ensure the cert.pem and fullchain.pem
71# are not still selfsigned after verification
72@backoff.protect
73def check_issuer(node, cert_name, issuer) -> None:
74 for fname in ("cert.pem", "fullchain.pem"):
75 actual_issuer = node.succeed(
76 f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}"
77 ).partition("=")[2]
78 assert (
79 issuer.lower() in actual_issuer.lower()
80 ), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}"
81
82# Ensures the provided domain matches with the given cert
83def check_domain(node, cert_name, domain, fail=False) -> None:
84 cmd = f"openssl x509 -noout -checkhost '{domain}' -in /var/lib/acme/{cert_name}/cert.pem"
85 run(node, cmd, fail=fail)
86
87# Ensures the required values for OCSP stapling are present
88# Pebble doesn't provide a full OCSP responder, so just checks the URL
89def check_stapling(node, cert_name, ca_domain, fail=False):
90 rc, _ = node.execute(
91 f"openssl x509 -noout -ocsp_uri -in /var/lib/acme/{cert_name}/cert.pem"
92 f" | grep -i 'http://{ca_domain}:4002' 2>&1",
93 )
94 assert rc == 0 or fail, "Failed to find OCSP URI in issued certificate"
95 run(
96 node,
97 f"openssl x509 -noout -ext tlsfeature -in /var/lib/acme/{cert_name}/cert.pem"
98 f" | grep -iv 'no extensions' 2>&1",
99 fail=fail,
100 )
101
102# Checks the keyType by validating the number of bits
103def check_key_bits(node, cert_name, bits, fail=False):
104 run(
105 node,
106 f"openssl x509 -noout -text -in /var/lib/acme/{cert_name}/cert.pem"
107 f" | grep -i Public-Key | grep {bits} | tee /dev/stderr",
108 fail=fail,
109 )
110
111# Ensure cert comes before chain in fullchain.pem
112def check_fullchain(node, cert_name):
113 cert_file = f"/var/lib/acme/{cert_name}/fullchain.pem"
114 num_certs = node.succeed(f"grep -o 'END CERTIFICATE' {cert_file}")
115 assert len(num_certs.strip().split("\n")) > 1, "Insufficient certs in fullchain.pem"
116
117 first_cert_data = node.succeed(
118 f"grep -m1 -B50 'END CERTIFICATE' {cert_file}"
119 " | openssl x509 -noout -text"
120 )
121 for line in first_cert_data.lower().split("\n"):
122 if "dns:" in line:
123 print(f"First DNSName in fullchain.pem: {line}")
124 assert cert_name.lower() in line, f"{cert_name} not found in {line}"
125 return
126
127 assert False
128
129# Checks the permissions in the cert directories are as expected
130def check_permissions(node, cert_name, group):
131 stat = "stat -L -c '%a %U %G' "
132 node.succeed(
133 f"test $({stat} /var/lib/acme/{cert_name}/*.pem"
134 f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
135 )
136 node.execute(f"ls -lahR /var/lib/acme/.lego/{cert_name}/* > /dev/stderr")
137 node.succeed(
138 f"test $({stat} /var/lib/acme/.lego/{cert_name}/*/{cert_name}*"
139 f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
140 )
141 node.succeed(
142 f"test $({stat} /var/lib/acme/{cert_name}"
143 f" | tee /dev/stderr | grep -v '750 acme {group}' | wc -l) -eq 0"
144 )
145 node.succeed(
146 f"test $(find /var/lib/acme/.lego/accounts -type f -exec {stat} {{}} \\;"
147 f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
148 )
149
150
151@backoff.protect
152def download_ca_certs(node, ca_domain):
153 node.succeed(f"curl https://{ca_domain}:15000/roots/0 > /tmp/ca.crt")
154 node.succeed(f"curl https://{ca_domain}:15000/intermediate-keys/0 >> /tmp/ca.crt")
155
156
157@backoff.protect
158def check_connection(node, domain, fail=False, minica=False):
159 cafile = "/tmp/ca.crt"
160 if minica:
161 cafile = "/var/lib/acme/.minica/cert.pem"
162 run(node,
163 f"openssl s_client -brief -CAfile {cafile}"
164 f" -verify 2 -verify_return_error -verify_hostname {domain}"
165 f" -servername {domain} -connect {domain}:443 < /dev/null",
166 fail=fail,
167 )