1#! @python3@/bin/python3 -B
2import argparse
3import ctypes
4import datetime
5import errno
6import glob
7import os
8import os.path
9import re
10import shutil
11import subprocess
12import sys
13import warnings
14import json
15from typing import NamedTuple, Dict, List
16from dataclasses import dataclass
17
18
19@dataclass
20class BootSpec:
21 init: str
22 initrd: str
23 initrdSecrets: str
24 kernel: str
25 kernelParams: List[str]
26 label: str
27 system: str
28 toplevel: str
29 specialisations: Dict[str, "BootSpec"]
30
31
32
33libc = ctypes.CDLL("libc.so.6")
34
35class SystemIdentifier(NamedTuple):
36 profile: str | None
37 generation: int
38 specialisation: str | None
39
40
41def copy_if_not_exists(source: str, dest: str) -> None:
42 if not os.path.exists(dest):
43 shutil.copyfile(source, dest)
44
45
46def generation_dir(profile: str | None, generation: int) -> str:
47 if profile:
48 return "/nix/var/nix/profiles/system-profiles/%s-%d-link" % (profile, generation)
49 else:
50 return "/nix/var/nix/profiles/system-%d-link" % (generation)
51
52def system_dir(profile: str | None, generation: int, specialisation: str | None) -> str:
53 d = generation_dir(profile, generation)
54 if specialisation:
55 return os.path.join(d, "specialisation", specialisation)
56 else:
57 return d
58
59BOOT_ENTRY = """title {title}
60version Generation {generation} {description}
61linux {kernel}
62initrd {initrd}
63options {kernel_params}
64"""
65
66def generation_conf_filename(profile: str | None, generation: int, specialisation: str | None) -> str:
67 pieces = [
68 "nixos",
69 profile or None,
70 "generation",
71 str(generation),
72 f"specialisation-{specialisation}" if specialisation else None,
73 ]
74 return "-".join(p for p in pieces if p) + ".conf"
75
76
77def write_loader_conf(profile: str | None, generation: int, specialisation: str | None) -> None:
78 with open("@efiSysMountPoint@/loader/loader.conf.tmp", 'w') as f:
79 if "@timeout@" != "":
80 f.write("timeout @timeout@\n")
81 f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation))
82 if not @editor@:
83 f.write("editor 0\n")
84 f.write("console-mode @consoleMode@\n")
85 f.flush()
86 os.fsync(f.fileno())
87 os.rename("@efiSysMountPoint@/loader/loader.conf.tmp", "@efiSysMountPoint@/loader/loader.conf")
88
89
90def get_bootspec(profile: str | None, generation: int) -> BootSpec:
91 system_directory = system_dir(profile, generation, None)
92 boot_json_path = os.path.realpath("%s/%s" % (system_directory, "boot.json"))
93 if os.path.isfile(boot_json_path):
94 boot_json_f = open(boot_json_path, 'r')
95 bootspec_json = json.load(boot_json_f)
96 else:
97 boot_json_str = subprocess.check_output([
98 "@bootspecTools@/bin/synthesize",
99 "--version",
100 "1",
101 system_directory,
102 "/dev/stdout"],
103 universal_newlines=True)
104 bootspec_json = json.loads(boot_json_str)
105 return bootspec_from_json(bootspec_json)
106
107def bootspec_from_json(bootspec_json: Dict) -> BootSpec:
108 specialisations = bootspec_json['org.nixos.specialisation.v1']
109 specialisations = {k: bootspec_from_json(v) for k, v in specialisations.items()}
110 return BootSpec(**bootspec_json['org.nixos.bootspec.v1'], specialisations=specialisations)
111
112
113def copy_from_file(file: str, dry_run: bool = False) -> str:
114 store_file_path = os.path.realpath(file)
115 suffix = os.path.basename(store_file_path)
116 store_dir = os.path.basename(os.path.dirname(store_file_path))
117 efi_file_path = "/efi/nixos/%s-%s.efi" % (store_dir, suffix)
118 if not dry_run:
119 copy_if_not_exists(store_file_path, "@efiSysMountPoint@%s" % (efi_file_path))
120 return efi_file_path
121
122def write_entry(profile: str | None, generation: int, specialisation: str | None,
123 machine_id: str, bootspec: BootSpec, current: bool) -> None:
124 if specialisation:
125 bootspec = bootspec.specialisations[specialisation]
126 kernel = copy_from_file(bootspec.kernel)
127 initrd = copy_from_file(bootspec.initrd)
128
129 title = "@distroName@{profile}{specialisation}".format(
130 profile=" [" + profile + "]" if profile else "",
131 specialisation=" (%s)" % specialisation if specialisation else "")
132
133 try:
134 subprocess.check_call([bootspec.initrdSecrets, "@efiSysMountPoint@%s" % (initrd)])
135 except FileNotFoundError:
136 pass
137 except subprocess.CalledProcessError:
138 if current:
139 print("failed to create initrd secrets!", file=sys.stderr)
140 sys.exit(1)
141 else:
142 print("warning: failed to create initrd secrets "
143 f'for "{title} - Configuration {generation}", an older generation', file=sys.stderr)
144 print("note: this is normal after having removed "
145 "or renamed a file in `boot.initrd.secrets`", file=sys.stderr)
146 entry_file = "@efiSysMountPoint@/loader/entries/%s" % (
147 generation_conf_filename(profile, generation, specialisation))
148 tmp_path = "%s.tmp" % (entry_file)
149 kernel_params = "init=%s " % bootspec.init
150
151 kernel_params = kernel_params + " ".join(bootspec.kernelParams)
152 build_time = int(os.path.getctime(system_dir(profile, generation, specialisation)))
153 build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')
154
155 with open(tmp_path, 'w') as f:
156 f.write(BOOT_ENTRY.format(title=title,
157 generation=generation,
158 kernel=kernel,
159 initrd=initrd,
160 kernel_params=kernel_params,
161 description=f"{bootspec.label}, built on {build_date}"))
162 if machine_id is not None:
163 f.write("machine-id %s\n" % machine_id)
164 f.flush()
165 os.fsync(f.fileno())
166 os.rename(tmp_path, entry_file)
167
168
169def get_generations(profile: str | None = None) -> list[SystemIdentifier]:
170 gen_list = subprocess.check_output([
171 "@nix@/bin/nix-env",
172 "--list-generations",
173 "-p",
174 "/nix/var/nix/profiles/%s" % ("system-profiles/" + profile if profile else "system"),
175 "--option", "build-users-group", ""],
176 universal_newlines=True)
177 gen_lines = gen_list.split('\n')
178 gen_lines.pop()
179
180 configurationLimit = @configurationLimit@
181 configurations = [
182 SystemIdentifier(
183 profile=profile,
184 generation=int(line.split()[0]),
185 specialisation=None
186 )
187 for line in gen_lines
188 ]
189 return configurations[-configurationLimit:]
190
191
192def remove_old_entries(gens: list[SystemIdentifier]) -> None:
193 rex_profile = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$")
194 rex_generation = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
195 known_paths = []
196 for gen in gens:
197 bootspec = get_bootspec(gen.profile, gen.generation)
198 known_paths.append(copy_from_file(bootspec.kernel, True))
199 known_paths.append(copy_from_file(bootspec.initrd, True))
200 for path in glob.iglob("@efiSysMountPoint@/loader/entries/nixos*-generation-[1-9]*.conf"):
201 if rex_profile.match(path):
202 prof = rex_profile.sub(r"\1", path)
203 else:
204 prof = None
205 try:
206 gen_number = int(rex_generation.sub(r"\1", path))
207 except ValueError:
208 continue
209 if not (prof, gen_number, None) in gens:
210 os.unlink(path)
211 for path in glob.iglob("@efiSysMountPoint@/efi/nixos/*"):
212 if not path in known_paths and not os.path.isdir(path):
213 os.unlink(path)
214
215
216def get_profiles() -> list[str]:
217 if os.path.isdir("/nix/var/nix/profiles/system-profiles/"):
218 return [x
219 for x in os.listdir("/nix/var/nix/profiles/system-profiles/")
220 if not x.endswith("-link")]
221 else:
222 return []
223
224def install_bootloader(args: argparse.Namespace) -> None:
225 try:
226 with open("/etc/machine-id") as machine_file:
227 machine_id = machine_file.readlines()[0]
228 except IOError as e:
229 if e.errno != errno.ENOENT:
230 raise
231 # Since systemd version 232 a machine ID is required and it might not
232 # be there on newly installed systems, so let's generate one so that
233 # bootctl can find it and we can also pass it to write_entry() later.
234 cmd = ["@systemd@/bin/systemd-machine-id-setup", "--print"]
235 machine_id = subprocess.run(
236 cmd, text=True, check=True, stdout=subprocess.PIPE
237 ).stdout.rstrip()
238
239 if os.getenv("NIXOS_INSTALL_GRUB") == "1":
240 warnings.warn("NIXOS_INSTALL_GRUB env var deprecated, use NIXOS_INSTALL_BOOTLOADER", DeprecationWarning)
241 os.environ["NIXOS_INSTALL_BOOTLOADER"] = "1"
242
243 # flags to pass to bootctl install/update
244 bootctl_flags = []
245
246 if "@canTouchEfiVariables@" != "1":
247 bootctl_flags.append("--no-variables")
248
249 if "@graceful@" == "1":
250 bootctl_flags.append("--graceful")
251
252 if os.getenv("NIXOS_INSTALL_BOOTLOADER") == "1":
253 # bootctl uses fopen() with modes "wxe" and fails if the file exists.
254 if os.path.exists("@efiSysMountPoint@/loader/loader.conf"):
255 os.unlink("@efiSysMountPoint@/loader/loader.conf")
256
257 subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["install"])
258 else:
259 # Update bootloader to latest if needed
260 available_out = subprocess.check_output(["@systemd@/bin/bootctl", "--version"], universal_newlines=True).split()[2]
261 installed_out = subprocess.check_output(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@", "status"], universal_newlines=True)
262
263 # See status_binaries() in systemd bootctl.c for code which generates this
264 installed_match = re.search(r"^\W+File:.*/EFI/(?:BOOT|systemd)/.*\.efi \(systemd-boot ([\d.]+[^)]*)\)$",
265 installed_out, re.IGNORECASE | re.MULTILINE)
266
267 available_match = re.search(r"^\((.*)\)$", available_out)
268
269 if installed_match is None:
270 raise Exception("could not find any previously installed systemd-boot")
271
272 if available_match is None:
273 raise Exception("could not determine systemd-boot version")
274
275 installed_version = installed_match.group(1)
276 available_version = available_match.group(1)
277
278 if installed_version < available_version:
279 print("updating systemd-boot from %s to %s" % (installed_version, available_version))
280 subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["update"])
281
282 os.makedirs("@efiSysMountPoint@/efi/nixos", exist_ok=True)
283 os.makedirs("@efiSysMountPoint@/loader/entries", exist_ok=True)
284
285 gens = get_generations()
286 for profile in get_profiles():
287 gens += get_generations(profile)
288 remove_old_entries(gens)
289 for gen in gens:
290 try:
291 bootspec = get_bootspec(gen.profile, gen.generation)
292 is_default = os.path.dirname(bootspec.init) == args.default_config
293 write_entry(*gen, machine_id, bootspec, current=is_default)
294 for specialisation in bootspec.specialisations.keys():
295 write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, current=is_default)
296 if is_default:
297 write_loader_conf(*gen)
298 except OSError as e:
299 # See https://github.com/NixOS/nixpkgs/issues/114552
300 if e.errno == errno.EINVAL:
301 profile = f"profile '{gen.profile}'" if gen.profile else "default profile"
302 print("ignoring {} in the list of boot entries because of the following error:\n{}".format(profile, e), file=sys.stderr)
303 else:
304 raise e
305
306 for root, _, files in os.walk('@efiSysMountPoint@/efi/nixos/.extra-files', topdown=False):
307 relative_root = root.removeprefix("@efiSysMountPoint@/efi/nixos/.extra-files").removeprefix("/")
308 actual_root = os.path.join("@efiSysMountPoint@", relative_root)
309
310 for file in files:
311 actual_file = os.path.join(actual_root, file)
312
313 if os.path.exists(actual_file):
314 os.unlink(actual_file)
315 os.unlink(os.path.join(root, file))
316
317 if not len(os.listdir(actual_root)):
318 os.rmdir(actual_root)
319 os.rmdir(root)
320
321 os.makedirs("@efiSysMountPoint@/efi/nixos/.extra-files", exist_ok=True)
322
323 subprocess.check_call("@copyExtraFiles@")
324
325
326def main() -> None:
327 parser = argparse.ArgumentParser(description='Update @distroName@-related systemd-boot files')
328 parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default @distroName@ config to boot')
329 args = parser.parse_args()
330
331 try:
332 install_bootloader(args)
333 finally:
334 # Since fat32 provides little recovery facilities after a crash,
335 # it can leave the system in an unbootable state, when a crash/outage
336 # happens shortly after an update. To decrease the likelihood of this
337 # event sync the efi filesystem after each update.
338 rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY))
339 if rc != 0:
340 print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr)
341
342
343if __name__ == '__main__':
344 main()