···
1
-
from typing import Any, Generator
1
+
from graphlib import TopologicalSorter
2
+
from pathlib import Path
3
+
from typing import Any, Generator, Literal
···
15
+
Order = Literal["arbitrary", "reverse-topological", "topological"]
class CalledProcessError(Exception):
···
50
+
async def nix_instantiate(attr_path: str) -> Path:
51
+
out = await check_subprocess_output(
55
+
stdout=asyncio.subprocess.PIPE,
56
+
stderr=asyncio.subprocess.PIPE,
58
+
drv = out.decode("utf-8").strip().split("!", 1)[0]
63
+
async def nix_query_requisites(drv: Path) -> list[Path]:
64
+
requisites = await check_subprocess_output(
69
+
stdout=asyncio.subprocess.PIPE,
70
+
stderr=asyncio.subprocess.PIPE,
77
+
for requisite in requisites.decode("utf-8").splitlines()
79
+
if requisite != drv_str
83
+
async def attr_instantiation_worker(
84
+
semaphore: asyncio.Semaphore,
86
+
) -> tuple[Path, str]:
87
+
async with semaphore:
88
+
eprint(f"Instantiating {attr_path}…")
89
+
return (await nix_instantiate(attr_path), attr_path)
92
+
async def requisites_worker(
93
+
semaphore: asyncio.Semaphore,
95
+
) -> tuple[Path, list[Path]]:
96
+
async with semaphore:
97
+
eprint(f"Obtaining requisites for {drv}…")
98
+
return (drv, await nix_query_requisites(drv))
101
+
def requisites_to_attrs(
102
+
drv_attr_paths: dict[Path, str],
103
+
requisites: list[Path],
106
+
Converts a set of requisite `.drv`s to a set of attribute paths.
107
+
Derivations that do not correspond to any of the packages we want to update will be discarded.
110
+
drv_attr_paths[requisite]
111
+
for requisite in requisites
112
+
if requisite in drv_attr_paths
116
+
def reverse_edges(graph: dict[str, set[str]]) -> dict[str, set[str]]:
118
+
Flips the edges of a directed graph.
121
+
reversed_graph: dict[str, set[str]] = {}
122
+
for dependent, dependencies in graph.items():
123
+
for dependency in dependencies:
124
+
reversed_graph.setdefault(dependency, set()).add(dependent)
126
+
return reversed_graph
129
+
def get_independent_sorter(
130
+
packages: list[dict],
131
+
) -> TopologicalSorter[str]:
133
+
Returns a sorter which treats all packages as independent,
134
+
which will allow them to be updated in parallel.
137
+
attr_deps: dict[str, set[str]] = {
138
+
package["attrPath"]: set() for package in packages
140
+
sorter = TopologicalSorter(attr_deps)
146
+
async def get_topological_sorter(
148
+
packages: list[dict],
149
+
reverse_order: bool,
150
+
) -> tuple[TopologicalSorter[str], list[dict]]:
152
+
Returns a sorter which returns packages in topological or reverse topological order,
153
+
which will ensure a package is updated before or after its dependencies, respectively.
156
+
semaphore = asyncio.Semaphore(max_workers)
158
+
drv_attr_paths = dict(
159
+
await asyncio.gather(
161
+
attr_instantiation_worker(semaphore, package["attrPath"])
162
+
for package in packages
167
+
drv_requisites = await asyncio.gather(
168
+
*(requisites_worker(semaphore, drv) for drv in drv_attr_paths.keys())
172
+
drv_attr_paths[drv]: requisites_to_attrs(drv_attr_paths, requisites)
173
+
for drv, requisites in drv_requisites
177
+
attr_deps = reverse_edges(attr_deps)
179
+
# Adjust packages order based on the topological one
180
+
ordered = list(TopologicalSorter(attr_deps).static_order())
181
+
packages = sorted(packages, key=lambda package: ordered.index(package["attrPath"]))
183
+
sorter = TopologicalSorter(attr_deps)
186
+
return sorter, packages
async def run_update_script(
merge_lock: asyncio.Lock,
···
packages_to_update.task_done()
397
+
async def populate_queue(
398
+
attr_packages: dict[str, dict],
399
+
sorter: TopologicalSorter[str],
400
+
packages_to_update: asyncio.Queue[dict | None],
404
+
Keeps populating the queue with packages that can be updated
405
+
according to ordering requirements. If topological order
406
+
is used, the packages will appear in waves, as packages with
407
+
no dependencies are processed and removed from the sorter.
408
+
With `order="none"`, all packages will be enqueued simultaneously.
411
+
# Fill up an update queue,
412
+
while sorter.is_active():
413
+
ready_packages = list(sorter.get_ready())
414
+
eprint(f"Enqueuing group of {len(ready_packages)} packages")
415
+
for package in ready_packages:
416
+
await packages_to_update.put(attr_packages[package])
417
+
await packages_to_update.join()
418
+
sorter.done(*ready_packages)
420
+
# Add sentinels, one for each worker.
421
+
# A worker will terminate when it gets a sentinel from the queue.
422
+
for i in range(num_workers):
423
+
await packages_to_update.put(None)
257
-
packages: list[dict],
430
+
attr_packages: dict[str, dict],
431
+
sorter: TopologicalSorter[str],
merge_lock = asyncio.Lock()
packages_to_update: asyncio.Queue[dict | None] = asyncio.Queue()
···
temp_dirs: list[tuple[str, str] | None] = []
# Do not create more workers than there are packages.
266
-
num_workers = min(max_workers, len(packages))
440
+
num_workers = min(max_workers, len(attr_packages))
nixpkgs_root_output = await check_subprocess_output(
···
temp_dir = stack.enter_context(make_worktree()) if commit else None
temp_dirs.append(temp_dir)
281
-
# Fill up an update queue,
282
-
for package in packages:
283
-
await packages_to_update.put(package)
285
-
# Add sentinels, one for each worker.
286
-
# A workers will terminate when it gets sentinel from the queue.
287
-
for i in range(num_workers):
288
-
await packages_to_update.put(None)
455
+
queue_task = populate_queue(
458
+
packages_to_update,
# Prepare updater workers for each temp_dir directory.
# At most `num_workers` instances of `run_update_script` will be running at one time.
···
···
with open(packages_path) as f:
505
+
if order != "arbitrary":
506
+
eprint("Sorting packages…")
507
+
reverse_order = order == "reverse-topological"
508
+
sorter, packages = await get_topological_sorter(
514
+
sorter = get_independent_sorter(packages)
516
+
attr_packages = {package["attrPath"]: package for package in packages}
eprint("Going to be running update for following packages:")
···
eprint("Running update for:")
343
-
await start_updates(max_workers, keep_going, commit, packages)
530
+
await start_updates(max_workers, keep_going, commit, attr_packages, sorter)
eprint("Packages updated!")
···
help="Commit the changes",
567
+
default="arbitrary",
568
+
choices=["arbitrary", "reverse-topological", "topological"],
569
+
help="Sort the packages based on dependency relation",
571
+
parser.add_argument(
help="JSON file containing the list of package names and their update scripts",
···
except KeyboardInterrupt as e: