1from graphlib import TopologicalSorter
2from pathlib import Path
3from typing import Any, Final, Generator, Literal
4import argparse
5import asyncio
6import contextlib
7import json
8import os
9import re
10import subprocess
11import sys
12import tempfile
13
14
15Order = Literal["arbitrary", "reverse-topological", "topological"]
16
17
18FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = (
19 "::fake_dependency_for_independent_packages"
20)
21
22
23class CalledProcessError(Exception):
24 process: asyncio.subprocess.Process
25 stderr: bytes | None
26
27
28class UpdateFailedException(Exception):
29 pass
30
31
32def eprint(*args: Any, **kwargs: Any) -> None:
33 print(*args, file=sys.stderr, **kwargs)
34
35
36async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes:
37 """
38 Emulate check and capture_output arguments of subprocess.run function.
39 """
40 process = await asyncio.create_subprocess_exec(*args, **kwargs)
41 # We need to use communicate() instead of wait(), as the OS pipe buffers
42 # can fill up and cause a deadlock.
43 stdout, stderr = await process.communicate()
44
45 if process.returncode != 0:
46 error = CalledProcessError()
47 error.process = process
48 error.stderr = stderr
49
50 raise error
51
52 return stdout
53
54
55async def nix_instantiate(attr_path: str) -> Path:
56 out = await check_subprocess_output(
57 "nix-instantiate",
58 "-A",
59 attr_path,
60 stdout=asyncio.subprocess.PIPE,
61 stderr=asyncio.subprocess.PIPE,
62 )
63 drv = out.decode("utf-8").strip().split("!", 1)[0]
64
65 return Path(drv)
66
67
68async def nix_query_requisites(drv: Path) -> list[Path]:
69 requisites = await check_subprocess_output(
70 "nix-store",
71 "--query",
72 "--requisites",
73 str(drv),
74 stdout=asyncio.subprocess.PIPE,
75 stderr=asyncio.subprocess.PIPE,
76 )
77
78 drv_str = str(drv)
79
80 return [
81 Path(requisite)
82 for requisite in requisites.decode("utf-8").splitlines()
83 # Avoid self-loops.
84 if requisite != drv_str
85 ]
86
87
88async def attr_instantiation_worker(
89 semaphore: asyncio.Semaphore,
90 attr_path: str,
91) -> tuple[Path, str]:
92 async with semaphore:
93 eprint(f"Instantiating {attr_path}…")
94 return (await nix_instantiate(attr_path), attr_path)
95
96
97async def requisites_worker(
98 semaphore: asyncio.Semaphore,
99 drv: Path,
100) -> tuple[Path, list[Path]]:
101 async with semaphore:
102 eprint(f"Obtaining requisites for {drv}…")
103 return (drv, await nix_query_requisites(drv))
104
105
106def requisites_to_attrs(
107 drv_attr_paths: dict[Path, str],
108 requisites: list[Path],
109) -> set[str]:
110 """
111 Converts a set of requisite `.drv`s to a set of attribute paths.
112 Derivations that do not correspond to any of the packages we want to update will be discarded.
113 """
114 return {
115 drv_attr_paths[requisite]
116 for requisite in requisites
117 if requisite in drv_attr_paths
118 }
119
120
121def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]:
122 """
123 Flips the edges of a directed graph.
124
125 Packages without any dependency relation in the updated set
126 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node.
127 """
128
129 reversed_graph: dict[str, set[str]] = {}
130 for dependent, dependencies in graph.items():
131 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES}
132 for dependency in dependencies:
133 reversed_graph.setdefault(dependency, set()).add(dependent)
134
135 return reversed_graph
136
137
138def get_independent_sorter(
139 packages: list[dict],
140) -> TopologicalSorter[str]:
141 """
142 Returns a sorter which treats all packages as independent,
143 which will allow them to be updated in parallel.
144 """
145
146 attr_deps: dict[str, set[str]] = {
147 package["attrPath"]: set() for package in packages
148 }
149 sorter = TopologicalSorter(attr_deps)
150 sorter.prepare()
151
152 return sorter
153
154
155async def get_topological_sorter(
156 max_workers: int,
157 packages: list[dict],
158 reverse_order: bool,
159) -> tuple[TopologicalSorter[str], list[dict]]:
160 """
161 Returns a sorter which returns packages in topological or reverse topological order,
162 which will ensure a package is updated before or after its dependencies, respectively.
163 """
164
165 semaphore = asyncio.Semaphore(max_workers)
166
167 drv_attr_paths = dict(
168 await asyncio.gather(
169 *(
170 attr_instantiation_worker(semaphore, package["attrPath"])
171 for package in packages
172 )
173 )
174 )
175
176 drv_requisites = await asyncio.gather(
177 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys())
178 )
179
180 attr_deps = {
181 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites)
182 for drv, requisites in drv_requisites
183 }
184
185 if reverse_order:
186 attr_deps = reverse_edges(attr_deps)
187
188 # Adjust packages order based on the topological one
189 ordered = list(TopologicalSorter(attr_deps).static_order())
190 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"]))
191
192 sorter = TopologicalSorter(attr_deps)
193 sorter.prepare()
194
195 return sorter, packages
196
197
198async def run_update_script(
199 nixpkgs_root: str,
200 merge_lock: asyncio.Lock,
201 temp_dir: tuple[str, str] | None,
202 package: dict,
203 keep_going: bool,
204) -> None:
205 worktree: str | None = None
206
207 update_script_command = package["updateScript"]
208
209 if temp_dir is not None:
210 worktree, _branch = temp_dir
211
212 # Ensure the worktree is clean before update.
213 await check_subprocess_output(
214 "git",
215 "reset",
216 "--hard",
217 "--quiet",
218 "HEAD",
219 cwd=worktree,
220 )
221
222 # Update scripts can use $(dirname $0) to get their location but we want to run
223 # their clones in the git worktree, not in the main nixpkgs repo.
224 update_script_command = map(
225 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg),
226 update_script_command,
227 )
228
229 eprint(f" - {package['name']}: UPDATING ...")
230
231 try:
232 update_info = await check_subprocess_output(
233 "env",
234 f"UPDATE_NIX_NAME={package['name']}",
235 f"UPDATE_NIX_PNAME={package['pname']}",
236 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}",
237 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}",
238 *update_script_command,
239 stdout=asyncio.subprocess.PIPE,
240 stderr=asyncio.subprocess.PIPE,
241 cwd=worktree,
242 )
243 await merge_changes(merge_lock, package, update_info, temp_dir)
244 except KeyboardInterrupt as e:
245 eprint("Cancelling…")
246 raise asyncio.exceptions.CancelledError()
247 except CalledProcessError as e:
248 eprint(f" - {package['name']}: ERROR")
249 if e.stderr is not None:
250 eprint()
251 eprint(
252 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
253 )
254 eprint()
255 eprint(e.stderr.decode("utf-8"))
256 with open(f"{package['pname']}.log", "wb") as logfile:
257 logfile.write(e.stderr)
258 eprint()
259 eprint(
260 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------"
261 )
262
263 if not keep_going:
264 raise UpdateFailedException(
265 f"The update script for {package['name']} failed with exit code {e.process.returncode}"
266 )
267
268
269@contextlib.contextmanager
270def make_worktree() -> Generator[tuple[str, str], None, None]:
271 with tempfile.TemporaryDirectory() as wt:
272 branch_name = f"update-{os.path.basename(wt)}"
273 target_directory = f"{wt}/nixpkgs"
274
275 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory])
276 try:
277 yield (target_directory, branch_name)
278 finally:
279 subprocess.run(["git", "worktree", "remove", "--force", target_directory])
280 subprocess.run(["git", "branch", "-D", branch_name])
281
282
283async def commit_changes(
284 name: str,
285 merge_lock: asyncio.Lock,
286 worktree: str,
287 branch: str,
288 changes: list[dict],
289) -> None:
290 for change in changes:
291 # Git can only handle a single index operation at a time
292 async with merge_lock:
293 await check_subprocess_output("git", "add", *change["files"], cwd=worktree)
294 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change)
295 if "commitMessage" in change:
296 commit_message = change["commitMessage"]
297 elif "commitBody" in change:
298 commit_message = commit_message + "\n\n" + change["commitBody"]
299 await check_subprocess_output(
300 "git",
301 "commit",
302 "--quiet",
303 "-m",
304 commit_message,
305 cwd=worktree,
306 )
307 await check_subprocess_output("git", "cherry-pick", branch)
308
309
310async def check_changes(
311 package: dict,
312 worktree: str,
313 update_info: bytes,
314) -> list[dict]:
315 if "commit" in package["supportedFeatures"]:
316 changes = json.loads(update_info)
317 else:
318 changes = [{}]
319
320 # Try to fill in missing attributes when there is just a single change.
321 if len(changes) == 1:
322 # Dynamic data from updater take precedence over static data from passthru.updateScript.
323 if "attrPath" not in changes[0]:
324 # update.nix is always passing attrPath
325 changes[0]["attrPath"] = package["attrPath"]
326
327 if "oldVersion" not in changes[0]:
328 # update.nix is always passing oldVersion
329 changes[0]["oldVersion"] = package["oldVersion"]
330
331 if "newVersion" not in changes[0]:
332 attr_path = changes[0]["attrPath"]
333 obtain_new_version_output = await check_subprocess_output(
334 "nix-instantiate",
335 "--expr",
336 f"with import ./. {{}}; lib.getVersion {attr_path}",
337 "--eval",
338 "--strict",
339 "--json",
340 stdout=asyncio.subprocess.PIPE,
341 stderr=asyncio.subprocess.PIPE,
342 cwd=worktree,
343 )
344 changes[0]["newVersion"] = json.loads(
345 obtain_new_version_output.decode("utf-8")
346 )
347
348 if "files" not in changes[0]:
349 changed_files_output = await check_subprocess_output(
350 "git",
351 "diff",
352 "--name-only",
353 "HEAD",
354 stdout=asyncio.subprocess.PIPE,
355 cwd=worktree,
356 )
357 changed_files = changed_files_output.splitlines()
358 changes[0]["files"] = changed_files
359
360 if len(changed_files) == 0:
361 return []
362
363 return changes
364
365
366async def merge_changes(
367 merge_lock: asyncio.Lock,
368 package: dict,
369 update_info: bytes,
370 temp_dir: tuple[str, str] | None,
371) -> None:
372 if temp_dir is not None:
373 worktree, branch = temp_dir
374 changes = await check_changes(package, worktree, update_info)
375
376 if len(changes) > 0:
377 await commit_changes(package["name"], merge_lock, worktree, branch, changes)
378 else:
379 eprint(f" - {package['name']}: DONE, no changes.")
380 else:
381 eprint(f" - {package['name']}: DONE.")
382
383
384async def updater(
385 nixpkgs_root: str,
386 temp_dir: tuple[str, str] | None,
387 merge_lock: asyncio.Lock,
388 packages_to_update: asyncio.Queue[dict | None],
389 keep_going: bool,
390 commit: bool,
391) -> None:
392 while True:
393 package = await packages_to_update.get()
394 if package is None:
395 # A sentinel received, we are done.
396 return
397
398 if not ("commit" in package["supportedFeatures"] or "attrPath" in package):
399 temp_dir = None
400
401 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going)
402
403 packages_to_update.task_done()
404
405
406async def populate_queue(
407 attr_packages: dict[str, dict],
408 sorter: TopologicalSorter[str],
409 packages_to_update: asyncio.Queue[dict | None],
410 num_workers: int,
411) -> None:
412 """
413 Keeps populating the queue with packages that can be updated
414 according to ordering requirements. If topological order
415 is used, the packages will appear in waves, as packages with
416 no dependencies are processed and removed from the sorter.
417 With `order="none"`, all packages will be enqueued simultaneously.
418 """
419
420 # Fill up an update queue,
421 while sorter.is_active():
422 ready_packages = list(sorter.get_ready())
423 eprint(f"Enqueuing group of {len(ready_packages)} packages")
424 for package in ready_packages:
425 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES:
426 continue
427 await packages_to_update.put(attr_packages[package])
428 await packages_to_update.join()
429 sorter.done(*ready_packages)
430
431 # Add sentinels, one for each worker.
432 # A worker will terminate when it gets a sentinel from the queue.
433 for i in range(num_workers):
434 await packages_to_update.put(None)
435
436
437async def start_updates(
438 max_workers: int,
439 keep_going: bool,
440 commit: bool,
441 attr_packages: dict[str, dict],
442 sorter: TopologicalSorter[str],
443) -> None:
444 merge_lock = asyncio.Lock()
445 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue()
446
447 with contextlib.ExitStack() as stack:
448 temp_dirs: list[tuple[str, str] | None] = []
449
450 # Do not create more workers than there are packages.
451 num_workers = min(max_workers, len(attr_packages))
452
453 nixpkgs_root_output = await check_subprocess_output(
454 "git",
455 "rev-parse",
456 "--show-toplevel",
457 stdout=asyncio.subprocess.PIPE,
458 )
459 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip()
460
461 # Set up temporary directories when using auto-commit.
462 for i in range(num_workers):
463 temp_dir = stack.enter_context(make_worktree()) if commit else None
464 temp_dirs.append(temp_dir)
465
466 queue_task = populate_queue(
467 attr_packages,
468 sorter,
469 packages_to_update,
470 num_workers,
471 )
472
473 # Prepare updater workers for each temp_dir directory.
474 # At most `num_workers` instances of `run_update_script` will be running at one time.
475 updater_tasks = [
476 updater(
477 nixpkgs_root,
478 temp_dir,
479 merge_lock,
480 packages_to_update,
481 keep_going,
482 commit,
483 )
484 for temp_dir in temp_dirs
485 ]
486
487 tasks = asyncio.gather(
488 *updater_tasks,
489 queue_task,
490 )
491
492 try:
493 # Start updater workers.
494 await tasks
495 except asyncio.exceptions.CancelledError:
496 # When one worker is cancelled, cancel the others too.
497 tasks.cancel()
498 except UpdateFailedException as e:
499 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false.
500 tasks.cancel()
501 eprint(e)
502 sys.exit(1)
503
504
505async def main(
506 max_workers: int,
507 keep_going: bool,
508 commit: bool,
509 packages_path: str,
510 skip_prompt: bool,
511 order: Order,
512) -> None:
513 with open(packages_path) as f:
514 packages = json.load(f)
515
516 if order != "arbitrary":
517 eprint("Sorting packages…")
518 reverse_order = order == "reverse-topological"
519 sorter, packages = await get_topological_sorter(
520 max_workers,
521 packages,
522 reverse_order,
523 )
524 else:
525 sorter = get_independent_sorter(packages)
526
527 attr_packages = {package["attrPath"]: package for package in packages}
528
529 eprint()
530 eprint("Going to be running update for following packages:")
531 for package in packages:
532 eprint(f" - {package['name']}")
533 eprint()
534
535 confirm = "" if skip_prompt else input("Press Enter key to continue...")
536
537 if confirm == "":
538 eprint()
539 eprint("Running update for:")
540
541 await start_updates(max_workers, keep_going, commit, attr_packages, sorter)
542
543 eprint()
544 eprint("Packages updated!")
545 sys.exit()
546 else:
547 eprint("Aborting!")
548 sys.exit(130)
549
550
551parser = argparse.ArgumentParser(description="Update packages")
552parser.add_argument(
553 "--max-workers",
554 "-j",
555 dest="max_workers",
556 type=int,
557 help="Number of updates to run concurrently",
558 nargs="?",
559 default=4,
560)
561parser.add_argument(
562 "--keep-going",
563 "-k",
564 dest="keep_going",
565 action="store_true",
566 help="Do not stop after first failure",
567)
568parser.add_argument(
569 "--commit",
570 "-c",
571 dest="commit",
572 action="store_true",
573 help="Commit the changes",
574)
575parser.add_argument(
576 "--order",
577 dest="order",
578 default="arbitrary",
579 choices=["arbitrary", "reverse-topological", "topological"],
580 help="Sort the packages based on dependency relation",
581)
582parser.add_argument(
583 "packages",
584 help="JSON file containing the list of package names and their update scripts",
585)
586parser.add_argument(
587 "--skip-prompt",
588 "-s",
589 dest="skip_prompt",
590 action="store_true",
591 help="Do not stop for prompts",
592)
593
594if __name__ == "__main__":
595 args = parser.parse_args()
596
597 try:
598 asyncio.run(
599 main(
600 args.max_workers,
601 args.keep_going,
602 args.commit,
603 args.packages,
604 args.skip_prompt,
605 args.order,
606 )
607 )
608 except KeyboardInterrupt as e:
609 # Let’s cancel outside of the main loop too.
610 sys.exit(130)