at master 18 kB view raw
1from graphlib import TopologicalSorter 2from pathlib import Path 3from typing import Any, Final, Generator, Literal 4import argparse 5import asyncio 6import contextlib 7import json 8import os 9import re 10import shlex 11import subprocess 12import sys 13import tempfile 14 15 16Order = Literal["arbitrary", "reverse-topological", "topological"] 17 18 19FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: Final[str] = ( 20 "::fake_dependency_for_independent_packages" 21) 22 23 24class CalledProcessError(Exception): 25 process: asyncio.subprocess.Process 26 stderr: bytes | None 27 28 29class UpdateFailedException(Exception): 30 pass 31 32 33def eprint(*args: Any, **kwargs: Any) -> None: 34 print(*args, file=sys.stderr, **kwargs) 35 36 37async def check_subprocess_output(*args: str, **kwargs: Any) -> bytes: 38 """ 39 Emulate check and capture_output arguments of subprocess.run function. 40 """ 41 process = await asyncio.create_subprocess_exec(*args, **kwargs) 42 # We need to use communicate() instead of wait(), as the OS pipe buffers 43 # can fill up and cause a deadlock. 44 stdout, stderr = await process.communicate() 45 46 if process.returncode != 0: 47 error = CalledProcessError() 48 error.process = process 49 error.stderr = stderr 50 51 raise error 52 53 return stdout 54 55 56async def nix_instantiate(attr_path: str) -> Path: 57 out = await check_subprocess_output( 58 "nix-instantiate", 59 "-A", 60 attr_path, 61 stdout=asyncio.subprocess.PIPE, 62 stderr=asyncio.subprocess.PIPE, 63 ) 64 drv = out.decode("utf-8").strip().split("!", 1)[0] 65 66 return Path(drv) 67 68 69async def nix_query_requisites(drv: Path) -> list[Path]: 70 requisites = await check_subprocess_output( 71 "nix-store", 72 "--query", 73 "--requisites", 74 str(drv), 75 stdout=asyncio.subprocess.PIPE, 76 stderr=asyncio.subprocess.PIPE, 77 ) 78 79 drv_str = str(drv) 80 81 return [ 82 Path(requisite) 83 for requisite in requisites.decode("utf-8").splitlines() 84 # Avoid self-loops. 85 if requisite != drv_str 86 ] 87 88 89async def attr_instantiation_worker( 90 semaphore: asyncio.Semaphore, 91 attr_path: str, 92) -> tuple[Path, str]: 93 async with semaphore: 94 eprint(f"Instantiating {attr_path}") 95 return (await nix_instantiate(attr_path), attr_path) 96 97 98async def requisites_worker( 99 semaphore: asyncio.Semaphore, 100 drv: Path, 101) -> tuple[Path, list[Path]]: 102 async with semaphore: 103 eprint(f"Obtaining requisites for {drv}") 104 return (drv, await nix_query_requisites(drv)) 105 106 107def requisites_to_attrs( 108 drv_attr_paths: dict[Path, str], 109 requisites: list[Path], 110) -> set[str]: 111 """ 112 Converts a set of requisite `.drv`s to a set of attribute paths. 113 Derivations that do not correspond to any of the packages we want to update will be discarded. 114 """ 115 return { 116 drv_attr_paths[requisite] 117 for requisite in requisites 118 if requisite in drv_attr_paths 119 } 120 121 122def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]: 123 """ 124 Flips the edges of a directed graph. 125 126 Packages without any dependency relation in the updated set 127 will be added to `FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES` node. 128 """ 129 130 reversed_graph: dict[str, set[str]] = {} 131 for dependent, dependencies in graph.items(): 132 dependencies = dependencies or {FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES} 133 for dependency in dependencies: 134 reversed_graph.setdefault(dependency, set()).add(dependent) 135 136 return reversed_graph 137 138 139def get_independent_sorter( 140 packages: list[dict], 141) -> TopologicalSorter[str]: 142 """ 143 Returns a sorter which treats all packages as independent, 144 which will allow them to be updated in parallel. 145 """ 146 147 attr_deps: dict[str, set[str]] = { 148 package["attrPath"]: set() for package in packages 149 } 150 sorter = TopologicalSorter(attr_deps) 151 sorter.prepare() 152 153 return sorter 154 155 156async def get_topological_sorter( 157 max_workers: int, 158 packages: list[dict], 159 reverse_order: bool, 160) -> tuple[TopologicalSorter[str], list[dict]]: 161 """ 162 Returns a sorter which returns packages in topological or reverse topological order, 163 which will ensure a package is updated before or after its dependencies, respectively. 164 """ 165 166 semaphore = asyncio.Semaphore(max_workers) 167 168 drv_attr_paths = dict( 169 await asyncio.gather( 170 *( 171 attr_instantiation_worker(semaphore, package["attrPath"]) 172 for package in packages 173 ) 174 ) 175 ) 176 177 drv_requisites = await asyncio.gather( 178 *(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys()) 179 ) 180 181 attr_deps = { 182 drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites) 183 for drv, requisites in drv_requisites 184 } 185 186 if reverse_order: 187 attr_deps = reverse_edges(attr_deps) 188 189 # Adjust packages order based on the topological one 190 ordered = list(TopologicalSorter(attr_deps).static_order()) 191 packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"])) 192 193 sorter = TopologicalSorter(attr_deps) 194 sorter.prepare() 195 196 return sorter, packages 197 198 199async def run_update_script( 200 nixpkgs_root: str, 201 merge_lock: asyncio.Lock, 202 temp_dir: tuple[str, str] | None, 203 package: dict, 204 keep_going: bool, 205) -> None: 206 worktree: str | None = None 207 208 update_script_command = package["updateScript"] 209 210 if temp_dir is not None: 211 worktree, _branch = temp_dir 212 213 # Ensure the worktree is clean before update. 214 await check_subprocess_output( 215 "git", 216 "reset", 217 "--hard", 218 "--quiet", 219 "HEAD", 220 cwd=worktree, 221 ) 222 223 # Update scripts can use $(dirname $0) to get their location but we want to run 224 # their clones in the git worktree, not in the main nixpkgs repo. 225 update_script_command = map( 226 lambda arg: re.sub(r"^{0}".format(re.escape(nixpkgs_root)), worktree, arg), 227 update_script_command, 228 ) 229 230 eprint(f" - {package['name']}: UPDATING ...") 231 232 try: 233 update_info = await check_subprocess_output( 234 "env", 235 f"UPDATE_NIX_NAME={package['name']}", 236 f"UPDATE_NIX_PNAME={package['pname']}", 237 f"UPDATE_NIX_OLD_VERSION={package['oldVersion']}", 238 f"UPDATE_NIX_ATTR_PATH={package['attrPath']}", 239 # Run all update scripts in the Nixpkgs development shell to get access to formatters and co. 240 "nix-shell", 241 nixpkgs_root + "/shell.nix", 242 "--run", 243 " ".join([ shlex.quote(s) for s in update_script_command ]), 244 stdout=asyncio.subprocess.PIPE, 245 stderr=asyncio.subprocess.PIPE, 246 cwd=worktree, 247 ) 248 await merge_changes(merge_lock, package, update_info, temp_dir) 249 except KeyboardInterrupt as e: 250 eprint("Cancelling…") 251 raise asyncio.exceptions.CancelledError() 252 except CalledProcessError as e: 253 eprint(f" - {package['name']}: ERROR") 254 if e.stderr is not None: 255 eprint() 256 eprint( 257 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 258 ) 259 eprint() 260 eprint(e.stderr.decode("utf-8")) 261 with open(f"{package['pname']}.log", "wb") as logfile: 262 logfile.write(e.stderr) 263 eprint() 264 eprint( 265 f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------" 266 ) 267 268 if not keep_going: 269 raise UpdateFailedException( 270 f"The update script for {package['name']} failed with exit code {e.process.returncode}" 271 ) 272 273 274@contextlib.contextmanager 275def make_worktree() -> Generator[tuple[str, str], None, None]: 276 with tempfile.TemporaryDirectory() as wt: 277 branch_name = f"update-{os.path.basename(wt)}" 278 target_directory = f"{wt}/nixpkgs" 279 280 subprocess.run(["git", "worktree", "add", "-b", branch_name, target_directory]) 281 try: 282 yield (target_directory, branch_name) 283 finally: 284 subprocess.run(["git", "worktree", "remove", "--force", target_directory]) 285 subprocess.run(["git", "branch", "-D", branch_name]) 286 287 288async def commit_changes( 289 name: str, 290 merge_lock: asyncio.Lock, 291 worktree: str, 292 branch: str, 293 changes: list[dict], 294) -> None: 295 for change in changes: 296 # Git can only handle a single index operation at a time 297 async with merge_lock: 298 await check_subprocess_output("git", "add", *change["files"], cwd=worktree) 299 commit_message = "{attrPath}: {oldVersion} -> {newVersion}".format(**change) 300 if "commitMessage" in change: 301 commit_message = change["commitMessage"] 302 elif "commitBody" in change: 303 commit_message = commit_message + "\n\n" + change["commitBody"] 304 await check_subprocess_output( 305 "git", 306 "commit", 307 "--quiet", 308 "-m", 309 commit_message, 310 cwd=worktree, 311 ) 312 await check_subprocess_output("git", "cherry-pick", branch) 313 314 315async def check_changes( 316 package: dict, 317 worktree: str, 318 update_info: bytes, 319) -> list[dict]: 320 if "commit" in package["supportedFeatures"]: 321 changes = json.loads(update_info) 322 else: 323 changes = [{}] 324 325 # Try to fill in missing attributes when there is just a single change. 326 if len(changes) == 1: 327 # Dynamic data from updater take precedence over static data from passthru.updateScript. 328 if "attrPath" not in changes[0]: 329 # update.nix is always passing attrPath 330 changes[0]["attrPath"] = package["attrPath"] 331 332 if "oldVersion" not in changes[0]: 333 # update.nix is always passing oldVersion 334 changes[0]["oldVersion"] = package["oldVersion"] 335 336 if "newVersion" not in changes[0]: 337 attr_path = changes[0]["attrPath"] 338 obtain_new_version_output = await check_subprocess_output( 339 "nix-instantiate", 340 "--expr", 341 f"with import ./. {{}}; lib.getVersion {attr_path}", 342 "--eval", 343 "--strict", 344 "--json", 345 stdout=asyncio.subprocess.PIPE, 346 stderr=asyncio.subprocess.PIPE, 347 cwd=worktree, 348 ) 349 changes[0]["newVersion"] = json.loads( 350 obtain_new_version_output.decode("utf-8") 351 ) 352 353 if "files" not in changes[0]: 354 changed_files_output = await check_subprocess_output( 355 "git", 356 "diff", 357 "--name-only", 358 "HEAD", 359 stdout=asyncio.subprocess.PIPE, 360 cwd=worktree, 361 ) 362 changed_files = changed_files_output.splitlines() 363 changes[0]["files"] = changed_files 364 365 if len(changed_files) == 0: 366 return [] 367 368 return changes 369 370 371async def merge_changes( 372 merge_lock: asyncio.Lock, 373 package: dict, 374 update_info: bytes, 375 temp_dir: tuple[str, str] | None, 376) -> None: 377 if temp_dir is not None: 378 worktree, branch = temp_dir 379 changes = await check_changes(package, worktree, update_info) 380 381 if len(changes) > 0: 382 await commit_changes(package["name"], merge_lock, worktree, branch, changes) 383 else: 384 eprint(f" - {package['name']}: DONE, no changes.") 385 else: 386 eprint(f" - {package['name']}: DONE.") 387 388 389async def updater( 390 nixpkgs_root: str, 391 temp_dir: tuple[str, str] | None, 392 merge_lock: asyncio.Lock, 393 packages_to_update: asyncio.Queue[dict | None], 394 keep_going: bool, 395 commit: bool, 396) -> None: 397 while True: 398 package = await packages_to_update.get() 399 if package is None: 400 # A sentinel received, we are done. 401 return 402 403 if not ("commit" in package["supportedFeatures"] or "attrPath" in package): 404 temp_dir = None 405 406 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going) 407 408 packages_to_update.task_done() 409 410 411async def populate_queue( 412 attr_packages: dict[str, dict], 413 sorter: TopologicalSorter[str], 414 packages_to_update: asyncio.Queue[dict | None], 415 num_workers: int, 416) -> None: 417 """ 418 Keeps populating the queue with packages that can be updated 419 according to ordering requirements. If topological order 420 is used, the packages will appear in waves, as packages with 421 no dependencies are processed and removed from the sorter. 422 With `order="none"`, all packages will be enqueued simultaneously. 423 """ 424 425 # Fill up an update queue, 426 while sorter.is_active(): 427 ready_packages = list(sorter.get_ready()) 428 eprint(f"Enqueuing group of {len(ready_packages)} packages") 429 for package in ready_packages: 430 if package == FAKE_DEPENDENCY_FOR_INDEPENDENT_PACKAGES: 431 continue 432 await packages_to_update.put(attr_packages[package]) 433 await packages_to_update.join() 434 sorter.done(*ready_packages) 435 436 # Add sentinels, one for each worker. 437 # A worker will terminate when it gets a sentinel from the queue. 438 for i in range(num_workers): 439 await packages_to_update.put(None) 440 441 442async def start_updates( 443 max_workers: int, 444 keep_going: bool, 445 commit: bool, 446 attr_packages: dict[str, dict], 447 sorter: TopologicalSorter[str], 448) -> None: 449 merge_lock = asyncio.Lock() 450 packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue() 451 452 with contextlib.ExitStack() as stack: 453 temp_dirs: list[tuple[str, str] | None] = [] 454 455 # Do not create more workers than there are packages. 456 num_workers = min(max_workers, len(attr_packages)) 457 458 nixpkgs_root_output = await check_subprocess_output( 459 "git", 460 "rev-parse", 461 "--show-toplevel", 462 stdout=asyncio.subprocess.PIPE, 463 ) 464 nixpkgs_root = nixpkgs_root_output.decode("utf-8").strip() 465 466 # Set up temporary directories when using auto-commit. 467 for i in range(num_workers): 468 temp_dir = stack.enter_context(make_worktree()) if commit else None 469 temp_dirs.append(temp_dir) 470 471 queue_task = populate_queue( 472 attr_packages, 473 sorter, 474 packages_to_update, 475 num_workers, 476 ) 477 478 # Prepare updater workers for each temp_dir directory. 479 # At most `num_workers` instances of `run_update_script` will be running at one time. 480 updater_tasks = [ 481 updater( 482 nixpkgs_root, 483 temp_dir, 484 merge_lock, 485 packages_to_update, 486 keep_going, 487 commit, 488 ) 489 for temp_dir in temp_dirs 490 ] 491 492 tasks = asyncio.gather( 493 *updater_tasks, 494 queue_task, 495 ) 496 497 try: 498 # Start updater workers. 499 await tasks 500 except asyncio.exceptions.CancelledError: 501 # When one worker is cancelled, cancel the others too. 502 tasks.cancel() 503 except UpdateFailedException as e: 504 # When one worker fails, cancel the others, as this exception is only thrown when keep_going is false. 505 tasks.cancel() 506 eprint(e) 507 sys.exit(1) 508 509 510async def main( 511 max_workers: int, 512 keep_going: bool, 513 commit: bool, 514 packages_path: str, 515 skip_prompt: bool, 516 order: Order, 517) -> None: 518 with open(packages_path) as f: 519 packages = json.load(f) 520 521 if order != "arbitrary": 522 eprint("Sorting packages…") 523 reverse_order = order == "reverse-topological" 524 sorter, packages = await get_topological_sorter( 525 max_workers, 526 packages, 527 reverse_order, 528 ) 529 else: 530 sorter = get_independent_sorter(packages) 531 532 attr_packages = {package["attrPath"]: package for package in packages} 533 534 eprint() 535 eprint("Going to be running update for following packages:") 536 for package in packages: 537 eprint(f" - {package['name']}") 538 eprint() 539 540 confirm = "" if skip_prompt else input("Press Enter key to continue...") 541 542 if confirm == "": 543 eprint() 544 eprint("Running update for:") 545 546 await start_updates(max_workers, keep_going, commit, attr_packages, sorter) 547 548 eprint() 549 eprint("Packages updated!") 550 sys.exit() 551 else: 552 eprint("Aborting!") 553 sys.exit(130) 554 555 556parser = argparse.ArgumentParser(description="Update packages") 557parser.add_argument( 558 "--max-workers", 559 "-j", 560 dest="max_workers", 561 type=int, 562 help="Number of updates to run concurrently", 563 nargs="?", 564 default=4, 565) 566parser.add_argument( 567 "--keep-going", 568 "-k", 569 dest="keep_going", 570 action="store_true", 571 help="Do not stop after first failure", 572) 573parser.add_argument( 574 "--commit", 575 "-c", 576 dest="commit", 577 action="store_true", 578 help="Commit the changes", 579) 580parser.add_argument( 581 "--order", 582 dest="order", 583 default="arbitrary", 584 choices=["arbitrary", "reverse-topological", "topological"], 585 help="Sort the packages based on dependency relation", 586) 587parser.add_argument( 588 "packages", 589 help="JSON file containing the list of package names and their update scripts", 590) 591parser.add_argument( 592 "--skip-prompt", 593 "-s", 594 dest="skip_prompt", 595 action="store_true", 596 help="Do not stop for prompts", 597) 598 599if __name__ == "__main__": 600 args = parser.parse_args() 601 602 try: 603 asyncio.run( 604 main( 605 args.max_workers, 606 args.keep_going, 607 args.commit, 608 args.packages, 609 args.skip_prompt, 610 args.order, 611 ) 612 ) 613 except KeyboardInterrupt as e: 614 # Let’s cancel outside of the main loop too. 615 sys.exit(130)