at 25.11-pre 18 kB view raw
1from graphlib import TopologicalSorter 2from pathlib import Path 3from typing import Any, Final, Generator, Literal 4import argparse 5import asyncio 6import contextlib 7import json 8import os 9import re 10import subprocess 11import sys 12import tempfile 13 14 15Order = Literal["arbitrary", "reverse-topological", "topological"] 16 17 18FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = ( 19 "::fake_dependency_for_independent_packages" 20) 21 22 23class CalledProcessError(Exception): 24 process: asyncio.subprocess.Process 25 stderr: bytes | None 26 27 28class UpdateFailedException(Exception): 29 pass 30 31 32def eprint(*args: Any, **kwargs: Any) -> None: 33 print(*args, file=sys.stderr, **kwargs) 34 35 36async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes: 37 """ 38 Emulate check and capture_output arguments of subprocess.run function. 39 """ 40 process = await asyncio.create_subprocess_exec(*args, **kwargs) 41 # We need to use communicate() instead of wait(), as the OS pipe buffers 42 # can fill up and cause a deadlock. 43 stdout, stderr = await process.communicate() 44 45 if process.returncode != 0: 46 error = CalledProcessError() 47 error.process = process 48 error.stderr = stderr 49 50 raise error 51 52 return stdout 53 54 55async def nix_instantiate(attr_path: str) -> Path: 56 out = await check_subprocess_output( 57 "nix-instantiate", 58 "-A", 59 attr_path, 60 stdout=asyncio.subprocess.PIPE, 61 stderr=asyncio.subprocess.PIPE, 62 ) 63 drv = out.decode("utf-8").strip().split("!", 1)[0] 64 65 return Path(drv) 66 67 68async def nix_query_requisites(drv: Path) -> list[Path]: 69 requisites = await check_subprocess_output( 70 "nix-store", 71 "--query", 72 "--requisites", 73 str(drv), 74 stdout=asyncio.subprocess.PIPE, 75 stderr=asyncio.subprocess.PIPE, 76 ) 77 78 drv_str = str(drv) 79 80 return [ 81 Path(requisite) 82 for requisite in requisites.decode("utf-8").splitlines() 83 # Avoid self-loops. 84 if requisite != drv_str 85 ] 86 87 88async def attr_instantiation_worker( 89 semaphore: asyncio.Semaphore, 90 attr_path: str, 91) -> tuple[Path, str]: 92 async with semaphore: 93 eprint(f"Instantiating {attr_path}") 94 return (await nix_instantiate(attr_path), attr_path) 95 96 97async def requisites_worker( 98 semaphore: asyncio.Semaphore, 99 drv: Path, 100) -> tuple[Path, list[Path]]: 101 async with semaphore: 102 eprint(f"Obtaining requisites for {drv}") 103 return (drv, await nix_query_requisites(drv)) 104 105 106def requisites_to_attrs( 107 drv_attr_paths: dict[Path, str], 108 requisites: list[Path], 109) -> set[str]: 110 """ 111 Converts a set of requisite `.drv`s to a set of attribute paths. 112 Derivations that do not correspond to any of the packages we want to update will be discarded. 113 """ 114 return { 115 drv_attr_paths[requisite] 116 for requisite in requisites 117 if requisite in drv_attr_paths 118 } 119 120 121def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]: 122 """ 123 Flips the edges of a directed graph. 124 125 Packages without any dependency relation in the updated set 126 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node. 127 """ 128 129 reversed_graph: dict[str, set[str]] = {} 130 for dependent, dependencies in graph.items(): 131 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES} 132 for dependency in dependencies: 133 reversed_graph.setdefault(dependency, set()).add(dependent) 134 135 return reversed_graph 136 137 138def get_independent_sorter( 139 packages: list[dict], 140) -> TopologicalSorter[str]: 141 """ 142 Returns a sorter which treats all packages as independent, 143 which will allow them to be updated in parallel. 144 """ 145 146 attr_deps: dict[str, set[str]] = { 147 package["attrPath"]: set() for package in packages 148 } 149 sorter = TopologicalSorter(attr_deps) 150 sorter.prepare() 151 152 return sorter 153 154 155async def get_topological_sorter( 156 max_workers: int, 157 packages: list[dict], 158 reverse_order: bool, 159) -> tuple[TopologicalSorter[str], list[dict]]: 160 """ 161 Returns a sorter which returns packages in topological or reverse topological order, 162 which will ensure a package is updated before or after its dependencies, respectively. 163 """ 164 165 semaphore = asyncio.Semaphore(max_workers) 166 167 drv_attr_paths = dict( 168 await asyncio.gather( 169 *( 170 attr_instantiation_worker(semaphore, package["attrPath"]) 171 for package in packages 172 ) 173 ) 174 ) 175 176 drv_requisites = await asyncio.gather( 177 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys()) 178 ) 179 180 attr_deps = { 181 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites) 182 for drv, requisites in drv_requisites 183 } 184 185 if reverse_order: 186 attr_deps = reverse_edges(attr_deps) 187 188 # Adjust packages order based on the topological one 189 ordered = list(TopologicalSorter(attr_deps).static_order()) 190 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"])) 191 192 sorter = TopologicalSorter(attr_deps) 193 sorter.prepare() 194 195 return sorter, packages 196 197 198async def run_update_script( 199 nixpkgs_root: str, 200 merge_lock: asyncio.Lock, 201 temp_dir: tuple[str, str] | None, 202 package: dict, 203 keep_going: bool, 204) -> None: 205 worktree: str | None = None 206 207 update_script_command = package["updateScript"] 208 209 if temp_dir is not None: 210 worktree, _branch = temp_dir 211 212 # Ensure the worktree is clean before update. 213 await check_subprocess_output( 214 "git", 215 "reset", 216 "--hard", 217 "--quiet", 218 "HEAD", 219 cwd=worktree, 220 ) 221 222 # Update scripts can use $(dirname $0) to get their location but we want to run 223 # their clones in the git worktree, not in the main nixpkgs repo. 224 update_script_command = map( 225 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg), 226 update_script_command, 227 ) 228 229 eprint(f" - {package['name']}: UPDATING ...") 230 231 try: 232 update_info = await check_subprocess_output( 233 "env", 234 f"UPDATE_NIX_NAME={package['name']}", 235 f"UPDATE_NIX_PNAME={package['pname']}", 236 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}", 237 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}", 238 *update_script_command, 239 stdout=asyncio.subprocess.PIPE, 240 stderr=asyncio.subprocess.PIPE, 241 cwd=worktree, 242 ) 243 await merge_changes(merge_lock, package, update_info, temp_dir) 244 except KeyboardInterrupt as e: 245 eprint("Cancelling…") 246 raise asyncio.exceptions.CancelledError() 247 except CalledProcessError as e: 248 eprint(f" - {package['name']}: ERROR") 249 if e.stderr is not None: 250 eprint() 251 eprint( 252 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 253 ) 254 eprint() 255 eprint(e.stderr.decode("utf-8")) 256 with open(f"{package['pname']}.log", "wb") as logfile: 257 logfile.write(e.stderr) 258 eprint() 259 eprint( 260 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 261 ) 262 263 if not keep_going: 264 raise UpdateFailedException( 265 f"The update script for {package['name']} failed with exit code {e.process.returncode}" 266 ) 267 268 269@contextlib.contextmanager 270def make_worktree() -> Generator[tuple[str, str], None, None]: 271 with tempfile.TemporaryDirectory() as wt: 272 branch_name = f"update-{os.path.basename(wt)}" 273 target_directory = f"{wt}/nixpkgs" 274 275 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory]) 276 try: 277 yield (target_directory, branch_name) 278 finally: 279 subprocess.run(["git", "worktree", "remove", "--force", target_directory]) 280 subprocess.run(["git", "branch", "-D", branch_name]) 281 282 283async def commit_changes( 284 name: str, 285 merge_lock: asyncio.Lock, 286 worktree: str, 287 branch: str, 288 changes: list[dict], 289) -> None: 290 for change in changes: 291 # Git can only handle a single index operation at a time 292 async with merge_lock: 293 await check_subprocess_output("git", "add", *change["files"], cwd=worktree) 294 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change) 295 if "commitMessage" in change: 296 commit_message = change["commitMessage"] 297 elif "commitBody" in change: 298 commit_message = commit_message + "\n\n" + change["commitBody"] 299 await check_subprocess_output( 300 "git", 301 "commit", 302 "--quiet", 303 "-m", 304 commit_message, 305 cwd=worktree, 306 ) 307 await check_subprocess_output("git", "cherry-pick", branch) 308 309 310async def check_changes( 311 package: dict, 312 worktree: str, 313 update_info: bytes, 314) -> list[dict]: 315 if "commit" in package["supportedFeatures"]: 316 changes = json.loads(update_info) 317 else: 318 changes = [{}] 319 320 # Try to fill in missing attributes when there is just a single change. 321 if len(changes) == 1: 322 # Dynamic data from updater take precedence over static data from passthru.updateScript. 323 if "attrPath" not in changes[0]: 324 # update.nix is always passing attrPath 325 changes[0]["attrPath"] = package["attrPath"] 326 327 if "oldVersion" not in changes[0]: 328 # update.nix is always passing oldVersion 329 changes[0]["oldVersion"] = package["oldVersion"] 330 331 if "newVersion" not in changes[0]: 332 attr_path = changes[0]["attrPath"] 333 obtain_new_version_output = await check_subprocess_output( 334 "nix-instantiate", 335 "--expr", 336 f"with import ./. {{}}; lib.getVersion {attr_path}", 337 "--eval", 338 "--strict", 339 "--json", 340 stdout=asyncio.subprocess.PIPE, 341 stderr=asyncio.subprocess.PIPE, 342 cwd=worktree, 343 ) 344 changes[0]["newVersion"] = json.loads( 345 obtain_new_version_output.decode("utf-8") 346 ) 347 348 if "files" not in changes[0]: 349 changed_files_output = await check_subprocess_output( 350 "git", 351 "diff", 352 "--name-only", 353 "HEAD", 354 stdout=asyncio.subprocess.PIPE, 355 cwd=worktree, 356 ) 357 changed_files = changed_files_output.splitlines() 358 changes[0]["files"] = changed_files 359 360 if len(changed_files) == 0: 361 return [] 362 363 return changes 364 365 366async def merge_changes( 367 merge_lock: asyncio.Lock, 368 package: dict, 369 update_info: bytes, 370 temp_dir: tuple[str, str] | None, 371) -> None: 372 if temp_dir is not None: 373 worktree, branch = temp_dir 374 changes = await check_changes(package, worktree, update_info) 375 376 if len(changes) > 0: 377 await commit_changes(package["name"], merge_lock, worktree, branch, changes) 378 else: 379 eprint(f" - {package['name']}: DONE, no changes.") 380 else: 381 eprint(f" - {package['name']}: DONE.") 382 383 384async def updater( 385 nixpkgs_root: str, 386 temp_dir: tuple[str, str] | None, 387 merge_lock: asyncio.Lock, 388 packages_to_update: asyncio.Queue[dict | None], 389 keep_going: bool, 390 commit: bool, 391) -> None: 392 while True: 393 package = await packages_to_update.get() 394 if package is None: 395 # A sentinel received, we are done. 396 return 397 398 if not ("commit" in package["supportedFeatures"] or "attrPath" in package): 399 temp_dir = None 400 401 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going) 402 403 packages_to_update.task_done() 404 405 406async def populate_queue( 407 attr_packages: dict[str, dict], 408 sorter: TopologicalSorter[str], 409 packages_to_update: asyncio.Queue[dict | None], 410 num_workers: int, 411) -> None: 412 """ 413 Keeps populating the queue with packages that can be updated 414 according to ordering requirements. If topological order 415 is used, the packages will appear in waves, as packages with 416 no dependencies are processed and removed from the sorter. 417 With `order="none"`, all packages will be enqueued simultaneously. 418 """ 419 420 # Fill up an update queue, 421 while sorter.is_active(): 422 ready_packages = list(sorter.get_ready()) 423 eprint(f"Enqueuing group of {len(ready_packages)} packages") 424 for package in ready_packages: 425 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: 426 continue 427 await packages_to_update.put(attr_packages[package]) 428 await packages_to_update.join() 429 sorter.done(*ready_packages) 430 431 # Add sentinels, one for each worker. 432 # A worker will terminate when it gets a sentinel from the queue. 433 for i in range(num_workers): 434 await packages_to_update.put(None) 435 436 437async def start_updates( 438 max_workers: int, 439 keep_going: bool, 440 commit: bool, 441 attr_packages: dict[str, dict], 442 sorter: TopologicalSorter[str], 443) -> None: 444 merge_lock = asyncio.Lock() 445 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue() 446 447 with contextlib.ExitStack() as stack: 448 temp_dirs: list[tuple[str, str] | None] = [] 449 450 # Do not create more workers than there are packages. 451 num_workers = min(max_workers, len(attr_packages)) 452 453 nixpkgs_root_output = await check_subprocess_output( 454 "git", 455 "rev-parse", 456 "--show-toplevel", 457 stdout=asyncio.subprocess.PIPE, 458 ) 459 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip() 460 461 # Set up temporary directories when using auto-commit. 462 for i in range(num_workers): 463 temp_dir = stack.enter_context(make_worktree()) if commit else None 464 temp_dirs.append(temp_dir) 465 466 queue_task = populate_queue( 467 attr_packages, 468 sorter, 469 packages_to_update, 470 num_workers, 471 ) 472 473 # Prepare updater workers for each temp_dir directory. 474 # At most `num_workers` instances of `run_update_script` will be running at one time. 475 updater_tasks = [ 476 updater( 477 nixpkgs_root, 478 temp_dir, 479 merge_lock, 480 packages_to_update, 481 keep_going, 482 commit, 483 ) 484 for temp_dir in temp_dirs 485 ] 486 487 tasks = asyncio.gather( 488 *updater_tasks, 489 queue_task, 490 ) 491 492 try: 493 # Start updater workers. 494 await tasks 495 except asyncio.exceptions.CancelledError: 496 # When one worker is cancelled, cancel the others too. 497 tasks.cancel() 498 except UpdateFailedException as e: 499 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false. 500 tasks.cancel() 501 eprint(e) 502 sys.exit(1) 503 504 505async def main( 506 max_workers: int, 507 keep_going: bool, 508 commit: bool, 509 packages_path: str, 510 skip_prompt: bool, 511 order: Order, 512) -> None: 513 with open(packages_path) as f: 514 packages = json.load(f) 515 516 if order != "arbitrary": 517 eprint("Sorting packages…") 518 reverse_order = order == "reverse-topological" 519 sorter, packages = await get_topological_sorter( 520 max_workers, 521 packages, 522 reverse_order, 523 ) 524 else: 525 sorter = get_independent_sorter(packages) 526 527 attr_packages = {package["attrPath"]: package for package in packages} 528 529 eprint() 530 eprint("Going to be running update for following packages:") 531 for package in packages: 532 eprint(f" - {package['name']}") 533 eprint() 534 535 confirm = "" if skip_prompt else input("Press Enter key to continue...") 536 537 if confirm == "": 538 eprint() 539 eprint("Running update for:") 540 541 await start_updates(max_workers, keep_going, commit, attr_packages, sorter) 542 543 eprint() 544 eprint("Packages updated!") 545 sys.exit() 546 else: 547 eprint("Aborting!") 548 sys.exit(130) 549 550 551parser = argparse.ArgumentParser(description="Update packages") 552parser.add_argument( 553 "--max-workers", 554 "-j", 555 dest="max_workers", 556 type=int, 557 help="Number of updates to run concurrently", 558 nargs="?", 559 default=4, 560) 561parser.add_argument( 562 "--keep-going", 563 "-k", 564 dest="keep_going", 565 action="store_true", 566 help="Do not stop after first failure", 567) 568parser.add_argument( 569 "--commit", 570 "-c", 571 dest="commit", 572 action="store_true", 573 help="Commit the changes", 574) 575parser.add_argument( 576 "--order", 577 dest="order", 578 default="arbitrary", 579 choices=["arbitrary", "reverse-topological", "topological"], 580 help="Sort the packages based on dependency relation", 581) 582parser.add_argument( 583 "packages", 584 help="JSON file containing the list of package names and their update scripts", 585) 586parser.add_argument( 587 "--skip-prompt", 588 "-s", 589 dest="skip_prompt", 590 action="store_true", 591 help="Do not stop for prompts", 592) 593 594if __name__ == "__main__": 595 args = parser.parse_args() 596 597 try: 598 asyncio.run( 599 main( 600 args.max_workers, 601 args.keep_going, 602 args.commit, 603 args.packages, 604 args.skip_prompt, 605 args.order, 606 ) 607 ) 608 except KeyboardInterrupt as e: 609 # Let’s cancel outside of the main loop too. 610 sys.exit(130)