1#! @python3@/bin/python3 -B
2import argparse
3import shutil
4import os
5import sys
6import errno
7import subprocess
8import glob
9import tempfile
10import errno
11import warnings
12import ctypes
13libc = ctypes.CDLL("libc.so.6")
14import re
15import datetime
16import glob
17import os.path
18from typing import NamedTuple, List, Optional
19
20class SystemIdentifier(NamedTuple):
21 profile: Optional[str]
22 generation: int
23 specialisation: Optional[str]
24
25
26def copy_if_not_exists(source: str, dest: str) -> None:
27 if not os.path.exists(dest):
28 shutil.copyfile(source, dest)
29
30
31def generation_dir(profile: Optional[str], generation: int) -> str:
32 if profile:
33 return "/nix/var/nix/profiles/system-profiles/%s-%d-link" % (profile, generation)
34 else:
35 return "/nix/var/nix/profiles/system-%d-link" % (generation)
36
37def system_dir(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str:
38 d = generation_dir(profile, generation)
39 if specialisation:
40 return os.path.join(d, "specialisation", specialisation)
41 else:
42 return d
43
44BOOT_ENTRY = """title NixOS{profile}{specialisation}
45version Generation {generation} {description}
46linux {kernel}
47initrd {initrd}
48options {kernel_params}
49"""
50
51def generation_conf_filename(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str:
52 pieces = [
53 "nixos",
54 profile or None,
55 "generation",
56 str(generation),
57 f"specialisation-{specialisation}" if specialisation else None,
58 ]
59 return "-".join(p for p in pieces if p) + ".conf"
60
61
62def write_loader_conf(profile: Optional[str], generation: int, specialisation: Optional[str]) -> None:
63 with open("@efiSysMountPoint@/loader/loader.conf.tmp", 'w') as f:
64 if "@timeout@" != "":
65 f.write("timeout @timeout@\n")
66 f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation))
67 if not @editor@:
68 f.write("editor 0\n");
69 f.write("console-mode @consoleMode@\n");
70 os.rename("@efiSysMountPoint@/loader/loader.conf.tmp", "@efiSysMountPoint@/loader/loader.conf")
71
72
73def profile_path(profile: Optional[str], generation: int, specialisation: Optional[str], name: str) -> str:
74 return os.path.realpath("%s/%s" % (system_dir(profile, generation, specialisation), name))
75
76
77def copy_from_profile(profile: Optional[str], generation: int, specialisation: Optional[str], name: str, dry_run: bool = False) -> str:
78 store_file_path = profile_path(profile, generation, specialisation, name)
79 suffix = os.path.basename(store_file_path)
80 store_dir = os.path.basename(os.path.dirname(store_file_path))
81 efi_file_path = "/efi/nixos/%s-%s.efi" % (store_dir, suffix)
82 if not dry_run:
83 copy_if_not_exists(store_file_path, "@efiSysMountPoint@%s" % (efi_file_path))
84 return efi_file_path
85
86
87def describe_generation(generation_dir: str) -> str:
88 try:
89 with open("%s/nixos-version" % generation_dir) as f:
90 nixos_version = f.read()
91 except IOError:
92 nixos_version = "Unknown"
93
94 kernel_dir = os.path.dirname(os.path.realpath("%s/kernel" % generation_dir))
95 module_dir = glob.glob("%s/lib/modules/*" % kernel_dir)[0]
96 kernel_version = os.path.basename(module_dir)
97
98 build_time = int(os.path.getctime(generation_dir))
99 build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')
100
101 description = "NixOS {}, Linux Kernel {}, Built on {}".format(
102 nixos_version, kernel_version, build_date
103 )
104
105 return description
106
107
108def write_entry(profile: Optional[str], generation: int, specialisation: Optional[str], machine_id: str) -> None:
109 kernel = copy_from_profile(profile, generation, specialisation, "kernel")
110 initrd = copy_from_profile(profile, generation, specialisation, "initrd")
111 try:
112 append_initrd_secrets = profile_path(profile, generation, specialisation, "append-initrd-secrets")
113 subprocess.check_call([append_initrd_secrets, "@efiSysMountPoint@%s" % (initrd)])
114 except FileNotFoundError:
115 pass
116 entry_file = "@efiSysMountPoint@/loader/entries/%s" % (
117 generation_conf_filename(profile, generation, specialisation))
118 generation_dir = os.readlink(system_dir(profile, generation, specialisation))
119 tmp_path = "%s.tmp" % (entry_file)
120 kernel_params = "init=%s/init " % generation_dir
121
122 with open("%s/kernel-params" % (generation_dir)) as params_file:
123 kernel_params = kernel_params + params_file.read()
124 with open(tmp_path, 'w') as f:
125 f.write(BOOT_ENTRY.format(profile=" [" + profile + "]" if profile else "",
126 specialisation=" (%s)" % specialisation if specialisation else "",
127 generation=generation,
128 kernel=kernel,
129 initrd=initrd,
130 kernel_params=kernel_params,
131 description=describe_generation(generation_dir)))
132 if machine_id is not None:
133 f.write("machine-id %s\n" % machine_id)
134 os.rename(tmp_path, entry_file)
135
136
137def mkdir_p(path: str) -> None:
138 try:
139 os.makedirs(path)
140 except OSError as e:
141 if e.errno != errno.EEXIST or not os.path.isdir(path):
142 raise
143
144
145def get_generations(profile: Optional[str] = None) -> List[SystemIdentifier]:
146 gen_list = subprocess.check_output([
147 "@nix@/bin/nix-env",
148 "--list-generations",
149 "-p",
150 "/nix/var/nix/profiles/%s" % ("system-profiles/" + profile if profile else "system"),
151 "--option", "build-users-group", ""],
152 universal_newlines=True)
153 gen_lines = gen_list.split('\n')
154 gen_lines.pop()
155
156 configurationLimit = @configurationLimit@
157 configurations = [
158 SystemIdentifier(
159 profile=profile,
160 generation=int(line.split()[0]),
161 specialisation=None
162 )
163 for line in gen_lines
164 ]
165 return configurations[-configurationLimit:]
166
167
168def get_specialisations(profile: Optional[str], generation: int, _: Optional[str]) -> List[SystemIdentifier]:
169 specialisations_dir = os.path.join(
170 system_dir(profile, generation, None), "specialisation")
171 if not os.path.exists(specialisations_dir):
172 return []
173 return [SystemIdentifier(profile, generation, spec) for spec in os.listdir(specialisations_dir)]
174
175
176def remove_old_entries(gens: List[SystemIdentifier]) -> None:
177 rex_profile = re.compile("^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$")
178 rex_generation = re.compile("^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
179 known_paths = []
180 for gen in gens:
181 known_paths.append(copy_from_profile(*gen, "kernel", True))
182 known_paths.append(copy_from_profile(*gen, "initrd", True))
183 for path in glob.iglob("@efiSysMountPoint@/loader/entries/nixos*-generation-[1-9]*.conf"):
184 if rex_profile.match(path):
185 prof = rex_profile.sub(r"\1", path)
186 else:
187 prof = None
188 try:
189 gen_number = int(rex_generation.sub(r"\1", path))
190 except ValueError:
191 continue
192 if not (prof, gen_number, None) in gens:
193 os.unlink(path)
194 for path in glob.iglob("@efiSysMountPoint@/efi/nixos/*"):
195 if not path in known_paths and not os.path.isdir(path):
196 os.unlink(path)
197
198
199def get_profiles() -> List[str]:
200 if os.path.isdir("/nix/var/nix/profiles/system-profiles/"):
201 return [x
202 for x in os.listdir("/nix/var/nix/profiles/system-profiles/")
203 if not x.endswith("-link")]
204 else:
205 return []
206
207def main() -> None:
208 parser = argparse.ArgumentParser(description='Update NixOS-related systemd-boot files')
209 parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default NixOS config to boot')
210 args = parser.parse_args()
211
212 try:
213 with open("/etc/machine-id") as machine_file:
214 machine_id = machine_file.readlines()[0]
215 except IOError as e:
216 if e.errno != errno.ENOENT:
217 raise
218 # Since systemd version 232 a machine ID is required and it might not
219 # be there on newly installed systems, so let's generate one so that
220 # bootctl can find it and we can also pass it to write_entry() later.
221 cmd = ["@systemd@/bin/systemd-machine-id-setup", "--print"]
222 machine_id = subprocess.run(
223 cmd, text=True, check=True, stdout=subprocess.PIPE
224 ).stdout.rstrip()
225
226 if os.getenv("NIXOS_INSTALL_GRUB") == "1":
227 warnings.warn("NIXOS_INSTALL_GRUB env var deprecated, use NIXOS_INSTALL_BOOTLOADER", DeprecationWarning)
228 os.environ["NIXOS_INSTALL_BOOTLOADER"] = "1"
229
230 if os.getenv("NIXOS_INSTALL_BOOTLOADER") == "1":
231 # bootctl uses fopen() with modes "wxe" and fails if the file exists.
232 if os.path.exists("@efiSysMountPoint@/loader/loader.conf"):
233 os.unlink("@efiSysMountPoint@/loader/loader.conf")
234
235 flags = []
236
237 if "@canTouchEfiVariables@" != "1":
238 flags.append("--no-variables")
239
240 if "@graceful@" == "1":
241 flags.append("--graceful")
242
243 subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + flags + ["install"])
244 else:
245 # Update bootloader to latest if needed
246 available_out = subprocess.check_output(["@systemd@/bin/bootctl", "--version"], universal_newlines=True).split()[2]
247 installed_out = subprocess.check_output(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@", "status"], universal_newlines=True)
248
249 # See status_binaries() in systemd bootctl.c for code which generates this
250 installed_match = re.search(r"^\W+File:.*/EFI/(?:BOOT|systemd)/.*\.efi \(systemd-boot ([\d.]+[^)]*)\)$",
251 installed_out, re.IGNORECASE | re.MULTILINE)
252
253 available_match = re.search(r"^\((.*)\)$", available_out)
254
255 if installed_match is None:
256 raise Exception("could not find any previously installed systemd-boot")
257
258 if available_match is None:
259 raise Exception("could not determine systemd-boot version")
260
261 installed_version = installed_match.group(1)
262 available_version = available_match.group(1)
263
264 if installed_version < available_version:
265 print("updating systemd-boot from %s to %s" % (installed_version, available_version))
266 subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@", "update"])
267
268 mkdir_p("@efiSysMountPoint@/efi/nixos")
269 mkdir_p("@efiSysMountPoint@/loader/entries")
270
271 gens = get_generations()
272 for profile in get_profiles():
273 gens += get_generations(profile)
274 remove_old_entries(gens)
275 for gen in gens:
276 try:
277 write_entry(*gen, machine_id)
278 for specialisation in get_specialisations(*gen):
279 write_entry(*specialisation, machine_id)
280 if os.readlink(system_dir(*gen)) == args.default_config:
281 write_loader_conf(*gen)
282 except OSError as e:
283 profile = f"profile '{gen.profile}'" if gen.profile else "default profile"
284 print("ignoring {} in the list of boot entries because of the following error:\n{}".format(profile, e), file=sys.stderr)
285
286 for root, _, files in os.walk('@efiSysMountPoint@/efi/nixos/.extra-files', topdown=False):
287 relative_root = root.removeprefix("@efiSysMountPoint@/efi/nixos/.extra-files").removeprefix("/")
288 actual_root = os.path.join("@efiSysMountPoint@", relative_root)
289
290 for file in files:
291 actual_file = os.path.join(actual_root, file)
292
293 if os.path.exists(actual_file):
294 os.unlink(actual_file)
295 os.unlink(os.path.join(root, file))
296
297 if not len(os.listdir(actual_root)):
298 os.rmdir(actual_root)
299 os.rmdir(root)
300
301 mkdir_p("@efiSysMountPoint@/efi/nixos/.extra-files")
302
303 subprocess.check_call("@copyExtraFiles@")
304
305 # Since fat32 provides little recovery facilities after a crash,
306 # it can leave the system in an unbootable state, when a crash/outage
307 # happens shortly after an update. To decrease the likelihood of this
308 # event sync the efi filesystem after each update.
309 rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY))
310 if rc != 0:
311 print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr)
312
313
314if __name__ == '__main__':
315 main()