1from graphlib import TopologicalSorter
2from pathlib import Path
3from typing import Any, Final, Generator, Literal
4import argparse
5import asyncio
6import contextlib
7import json
8import os
9import re
10import shlex
11import subprocess
12import sys
13import tempfile
14
15
16Order = Literal["arbitrary", "reverse-topological", "topological"]
17
18
19FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = (
20 "::fake_dependency_for_independent_packages"
21)
22
23
24class CalledProcessError(Exception):
25 process: asyncio.subprocess.Process
26 stderr: bytes | None
27
28
29class UpdateFailedException(Exception):
30 pass
31
32
33def eprint(*args: Any, **kwargs: Any) -> None:
34 print(*args, file=sys.stderr, **kwargs)
35
36
37async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes:
38 """
39 Emulate check and capture_output arguments of subprocess.run function.
40 """
41 process = await asyncio.create_subprocess_exec(*args, **kwargs)
42 # We need to use communicate() instead of wait(), as the OS pipe buffers
43 # can fill up and cause a deadlock.
44 stdout, stderr = await process.communicate()
45
46 if process.returncode != 0:
47 error = CalledProcessError()
48 error.process = process
49 error.stderr = stderr
50
51 raise error
52
53 return stdout
54
55
56async def nix_instantiate(attr_path: str) -> Path:
57 out = await check_subprocess_output(
58 "nix-instantiate",
59 "-A",
60 attr_path,
61 stdout=asyncio.subprocess.PIPE,
62 stderr=asyncio.subprocess.PIPE,
63 )
64 drv = out.decode("utf-8").strip().split("!", 1)[0]
65
66 return Path(drv)
67
68
69async def nix_query_requisites(drv: Path) -> list[Path]:
70 requisites = await check_subprocess_output(
71 "nix-store",
72 "--query",
73 "--requisites",
74 str(drv),
75 stdout=asyncio.subprocess.PIPE,
76 stderr=asyncio.subprocess.PIPE,
77 )
78
79 drv_str = str(drv)
80
81 return [
82 Path(requisite)
83 for requisite in requisites.decode("utf-8").splitlines()
84 # Avoid self-loops.
85 if requisite != drv_str
86 ]
87
88
89async def attr_instantiation_worker(
90 semaphore: asyncio.Semaphore,
91 attr_path: str,
92) -> tuple[Path, str]:
93 async with semaphore:
94 eprint(f"Instantiating {attr_path}…")
95 return (await nix_instantiate(attr_path), attr_path)
96
97
98async def requisites_worker(
99 semaphore: asyncio.Semaphore,
100 drv: Path,
101) -> tuple[Path, list[Path]]:
102 async with semaphore:
103 eprint(f"Obtaining requisites for {drv}…")
104 return (drv, await nix_query_requisites(drv))
105
106
107def requisites_to_attrs(
108 drv_attr_paths: dict[Path, str],
109 requisites: list[Path],
110) -> set[str]:
111 """
112 Converts a set of requisite `.drv`s to a set of attribute paths.
113 Derivations that do not correspond to any of the packages we want to update will be discarded.
114 """
115 return {
116 drv_attr_paths[requisite]
117 for requisite in requisites
118 if requisite in drv_attr_paths
119 }
120
121
122def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]:
123 """
124 Flips the edges of a directed graph.
125
126 Packages without any dependency relation in the updated set
127 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node.
128 """
129
130 reversed_graph: dict[str, set[str]] = {}
131 for dependent, dependencies in graph.items():
132 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES}
133 for dependency in dependencies:
134 reversed_graph.setdefault(dependency, set()).add(dependent)
135
136 return reversed_graph
137
138
139def get_independent_sorter(
140 packages: list[dict],
141) -> TopologicalSorter[str]:
142 """
143 Returns a sorter which treats all packages as independent,
144 which will allow them to be updated in parallel.
145 """
146
147 attr_deps: dict[str, set[str]] = {
148 package["attrPath"]: set() for package in packages
149 }
150 sorter = TopologicalSorter(attr_deps)
151 sorter.prepare()
152
153 return sorter
154
155
156async def get_topological_sorter(
157 max_workers: int,
158 packages: list[dict],
159 reverse_order: bool,
160) -> tuple[TopologicalSorter[str], list[dict]]:
161 """
162 Returns a sorter which returns packages in topological or reverse topological order,
163 which will ensure a package is updated before or after its dependencies, respectively.
164 """
165
166 semaphore = asyncio.Semaphore(max_workers)
167
168 drv_attr_paths = dict(
169 await asyncio.gather(
170 *(
171 attr_instantiation_worker(semaphore, package["attrPath"])
172 for package in packages
173 )
174 )
175 )
176
177 drv_requisites = await asyncio.gather(
178 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys())
179 )
180
181 attr_deps = {
182 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites)
183 for drv, requisites in drv_requisites
184 }
185
186 if reverse_order:
187 attr_deps = reverse_edges(attr_deps)
188
189 # Adjust packages order based on the topological one
190 ordered = list(TopologicalSorter(attr_deps).static_order())
191 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"]))
192
193 sorter = TopologicalSorter(attr_deps)
194 sorter.prepare()
195
196 return sorter, packages
197
198
199async def run_update_script(
200 nixpkgs_root: str,
201 merge_lock: asyncio.Lock,
202 temp_dir: tuple[str, str] | None,
203 package: dict,
204 keep_going: bool,
205) -> None:
206 worktree: str | None = None
207
208 update_script_command = package["updateScript"]
209
210 if temp_dir is not None:
211 worktree, _branch = temp_dir
212
213 # Ensure the worktree is clean before update.
214 await check_subprocess_output(
215 "git",
216 "reset",
217 "--hard",
218 "--quiet",
219 "HEAD",
220 cwd=worktree,
221 )
222
223 # Update scripts can use $(dirname $0) to get their location but we want to run
224 # their clones in the git worktree, not in the main nixpkgs repo.
225 update_script_command = map(
226 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg),
227 update_script_command,
228 )
229
230 eprint(f" - {package['name']}: UPDATING ...")
231
232 try:
233 update_info = await check_subprocess_output(
234 "env",
235 f"UPDATE_NIX_NAME={package['name']}",
236 f"UPDATE_NIX_PNAME={package['pname']}",
237 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}",
238 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}",
239 # Run all update scripts in the Nixpkgs development shell to get access to formatters and co.
240 "nix-shell",
241 nixpkgs_root + "/shell.nix",
242 "--run",
243 " ".join([ shlex.quote(s) for s in update_script_command ]),
244 stdout=asyncio.subprocess.PIPE,
245 stderr=asyncio.subprocess.PIPE,
246 cwd=worktree,
247 )
248 await merge_changes(merge_lock, package, update_info, temp_dir)
249 except KeyboardInterrupt as e:
250 eprint("Cancelling…")
251 raise asyncio.exceptions.CancelledError()
252 except CalledProcessError as e:
253 eprint(f" - {package['name']}: ERROR")
254 if e.stderr is not None:
255 eprint()
256 eprint(
257 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
258 )
259 eprint()
260 eprint(e.stderr.decode("utf-8"))
261 with open(f"{package['pname']}.log", "wb") as logfile:
262 logfile.write(e.stderr)
263 eprint()
264 eprint(
265 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
266 )
267
268 if not keep_going:
269 raise UpdateFailedException(
270 f"The update script for {package['name']} failed with exit code {e.process.returncode}"
271 )
272
273
274@contextlib.contextmanager
275def make_worktree() -> Generator[tuple[str, str], None, None]:
276 with tempfile.TemporaryDirectory() as wt:
277 branch_name = f"update-{os.path.basename(wt)}"
278 target_directory = f"{wt}/nixpkgs"
279
280 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory])
281 try:
282 yield (target_directory, branch_name)
283 finally:
284 subprocess.run(["git", "worktree", "remove", "--force", target_directory])
285 subprocess.run(["git", "branch", "-D", branch_name])
286
287
288async def commit_changes(
289 name: str,
290 merge_lock: asyncio.Lock,
291 worktree: str,
292 branch: str,
293 changes: list[dict],
294) -> None:
295 for change in changes:
296 # Git can only handle a single index operation at a time
297 async with merge_lock:
298 await check_subprocess_output("git", "add", *change["files"], cwd=worktree)
299 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change)
300 if "commitMessage" in change:
301 commit_message = change["commitMessage"]
302 elif "commitBody" in change:
303 commit_message = commit_message + "\n\n" + change["commitBody"]
304 await check_subprocess_output(
305 "git",
306 "commit",
307 "--quiet",
308 "-m",
309 commit_message,
310 cwd=worktree,
311 )
312 await check_subprocess_output("git", "cherry-pick", branch)
313
314
315async def check_changes(
316 package: dict,
317 worktree: str,
318 update_info: bytes,
319) -> list[dict]:
320 if "commit" in package["supportedFeatures"]:
321 changes = json.loads(update_info)
322 else:
323 changes = [{}]
324
325 # Try to fill in missing attributes when there is just a single change.
326 if len(changes) == 1:
327 # Dynamic data from updater take precedence over static data from passthru.updateScript.
328 if "attrPath" not in changes[0]:
329 # update.nix is always passing attrPath
330 changes[0]["attrPath"] = package["attrPath"]
331
332 if "oldVersion" not in changes[0]:
333 # update.nix is always passing oldVersion
334 changes[0]["oldVersion"] = package["oldVersion"]
335
336 if "newVersion" not in changes[0]:
337 attr_path = changes[0]["attrPath"]
338 obtain_new_version_output = await check_subprocess_output(
339 "nix-instantiate",
340 "--expr",
341 f"with import ./. {{}}; lib.getVersion {attr_path}",
342 "--eval",
343 "--strict",
344 "--json",
345 stdout=asyncio.subprocess.PIPE,
346 stderr=asyncio.subprocess.PIPE,
347 cwd=worktree,
348 )
349 changes[0]["newVersion"] = json.loads(
350 obtain_new_version_output.decode("utf-8")
351 )
352
353 if "files" not in changes[0]:
354 changed_files_output = await check_subprocess_output(
355 "git",
356 "diff",
357 "--name-only",
358 "HEAD",
359 stdout=asyncio.subprocess.PIPE,
360 cwd=worktree,
361 )
362 changed_files = changed_files_output.splitlines()
363 changes[0]["files"] = changed_files
364
365 if len(changed_files) == 0:
366 return []
367
368 return changes
369
370
371async def merge_changes(
372 merge_lock: asyncio.Lock,
373 package: dict,
374 update_info: bytes,
375 temp_dir: tuple[str, str] | None,
376) -> None:
377 if temp_dir is not None:
378 worktree, branch = temp_dir
379 changes = await check_changes(package, worktree, update_info)
380
381 if len(changes) > 0:
382 await commit_changes(package["name"], merge_lock, worktree, branch, changes)
383 else:
384 eprint(f" - {package['name']}: DONE, no changes.")
385 else:
386 eprint(f" - {package['name']}: DONE.")
387
388
389async def updater(
390 nixpkgs_root: str,
391 temp_dir: tuple[str, str] | None,
392 merge_lock: asyncio.Lock,
393 packages_to_update: asyncio.Queue[dict | None],
394 keep_going: bool,
395 commit: bool,
396) -> None:
397 while True:
398 package = await packages_to_update.get()
399 if package is None:
400 # A sentinel received, we are done.
401 return
402
403 if not ("commit" in package["supportedFeatures"] or "attrPath" in package):
404 temp_dir = None
405
406 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going)
407
408 packages_to_update.task_done()
409
410
411async def populate_queue(
412 attr_packages: dict[str, dict],
413 sorter: TopologicalSorter[str],
414 packages_to_update: asyncio.Queue[dict | None],
415 num_workers: int,
416) -> None:
417 """
418 Keeps populating the queue with packages that can be updated
419 according to ordering requirements. If topological order
420 is used, the packages will appear in waves, as packages with
421 no dependencies are processed and removed from the sorter.
422 With `order="none"`, all packages will be enqueued simultaneously.
423 """
424
425 # Fill up an update queue,
426 while sorter.is_active():
427 ready_packages = list(sorter.get_ready())
428 eprint(f"Enqueuing group of {len(ready_packages)} packages")
429 for package in ready_packages:
430 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES:
431 continue
432 await packages_to_update.put(attr_packages[package])
433 await packages_to_update.join()
434 sorter.done(*ready_packages)
435
436 # Add sentinels, one for each worker.
437 # A worker will terminate when it gets a sentinel from the queue.
438 for i in range(num_workers):
439 await packages_to_update.put(None)
440
441
442async def start_updates(
443 max_workers: int,
444 keep_going: bool,
445 commit: bool,
446 attr_packages: dict[str, dict],
447 sorter: TopologicalSorter[str],
448) -> None:
449 merge_lock = asyncio.Lock()
450 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue()
451
452 with contextlib.ExitStack() as stack:
453 temp_dirs: list[tuple[str, str] | None] = []
454
455 # Do not create more workers than there are packages.
456 num_workers = min(max_workers, len(attr_packages))
457
458 nixpkgs_root_output = await check_subprocess_output(
459 "git",
460 "rev-parse",
461 "--show-toplevel",
462 stdout=asyncio.subprocess.PIPE,
463 )
464 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip()
465
466 # Set up temporary directories when using auto-commit.
467 for i in range(num_workers):
468 temp_dir = stack.enter_context(make_worktree()) if commit else None
469 temp_dirs.append(temp_dir)
470
471 queue_task = populate_queue(
472 attr_packages,
473 sorter,
474 packages_to_update,
475 num_workers,
476 )
477
478 # Prepare updater workers for each temp_dir directory.
479 # At most `num_workers` instances of `run_update_script` will be running at one time.
480 updater_tasks = [
481 updater(
482 nixpkgs_root,
483 temp_dir,
484 merge_lock,
485 packages_to_update,
486 keep_going,
487 commit,
488 )
489 for temp_dir in temp_dirs
490 ]
491
492 tasks = asyncio.gather(
493 *updater_tasks,
494 queue_task,
495 )
496
497 try:
498 # Start updater workers.
499 await tasks
500 except asyncio.exceptions.CancelledError:
501 # When one worker is cancelled, cancel the others too.
502 tasks.cancel()
503 except UpdateFailedException as e:
504 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false.
505 tasks.cancel()
506 eprint(e)
507 sys.exit(1)
508
509
510async def main(
511 max_workers: int,
512 keep_going: bool,
513 commit: bool,
514 packages_path: str,
515 skip_prompt: bool,
516 order: Order,
517) -> None:
518 with open(packages_path) as f:
519 packages = json.load(f)
520
521 if order != "arbitrary":
522 eprint("Sorting packages…")
523 reverse_order = order == "reverse-topological"
524 sorter, packages = await get_topological_sorter(
525 max_workers,
526 packages,
527 reverse_order,
528 )
529 else:
530 sorter = get_independent_sorter(packages)
531
532 attr_packages = {package["attrPath"]: package for package in packages}
533
534 eprint()
535 eprint("Going to be running update for following packages:")
536 for package in packages:
537 eprint(f" - {package['name']}")
538 eprint()
539
540 confirm = "" if skip_prompt else input("Press Enter key to continue...")
541
542 if confirm == "":
543 eprint()
544 eprint("Running update for:")
545
546 await start_updates(max_workers, keep_going, commit, attr_packages, sorter)
547
548 eprint()
549 eprint("Packages updated!")
550 sys.exit()
551 else:
552 eprint("Aborting!")
553 sys.exit(130)
554
555
556parser = argparse.ArgumentParser(description="Update packages")
557parser.add_argument(
558 "--max-workers",
559 "-j",
560 dest="max_workers",
561 type=int,
562 help="Number of updates to run concurrently",
563 nargs="?",
564 default=4,
565)
566parser.add_argument(
567 "--keep-going",
568 "-k",
569 dest="keep_going",
570 action="store_true",
571 help="Do not stop after first failure",
572)
573parser.add_argument(
574 "--commit",
575 "-c",
576 dest="commit",
577 action="store_true",
578 help="Commit the changes",
579)
580parser.add_argument(
581 "--order",
582 dest="order",
583 default="arbitrary",
584 choices=["arbitrary", "reverse-topological", "topological"],
585 help="Sort the packages based on dependency relation",
586)
587parser.add_argument(
588 "packages",
589 help="JSON file containing the list of package names and their update scripts",
590)
591parser.add_argument(
592 "--skip-prompt",
593 "-s",
594 dest="skip_prompt",
595 action="store_true",
596 help="Do not stop for prompts",
597)
598
599if __name__ == "__main__":
600 args = parser.parse_args()
601
602 try:
603 asyncio.run(
604 main(
605 args.max_workers,
606 args.keep_going,
607 args.commit,
608 args.packages,
609 args.skip_prompt,
610 args.order,
611 )
612 )
613 except KeyboardInterrupt as e:
614 # Let’s cancel outside of the main loop too.
615 sys.exit(130)