1from __future__ import annotations
2from typing import Dict, Generator, List, Optional, Tuple
3import argparse
4import asyncio
5import contextlib
6import json
7import os
8import re
9import subprocess
10import sys
11import tempfile
12
13class CalledProcessError(Exception):
14 process: asyncio.subprocess.Process
15
16def eprint(*args, **kwargs):
17 print(*args, file=sys.stderr, **kwargs)
18
19async def check_subprocess(*args, **kwargs):
20 """
21 Emulate check argument of subprocess.run function.
22 """
23 process = await asyncio.create_subprocess_exec(*args, **kwargs)
24 returncode = await process.wait()
25
26 if returncode != 0:
27 error = CalledProcessError()
28 error.process = process
29
30 raise error
31
32 return process
33
34async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_dir: Optional[Tuple[str, str]], package: Dict, keep_going: bool):
35 worktree: Optional[str] = None
36
37 update_script_command = package['updateScript']
38
39 if temp_dir is not None:
40 worktree, _branch = temp_dir
41
42 # Ensure the worktree is clean before update.
43 await check_subprocess('git', 'reset', '--hard', '--quiet', 'HEAD', cwd=worktree)
44
45 # Update scripts can use $(dirname $0) to get their location but we want to run
46 # their clones in the git worktree, not in the main nixpkgs repo.
47 update_script_command = map(lambda arg: re.sub(r'^{0}'.format(re.escape(nixpkgs_root)), worktree, arg), update_script_command)
48
49 eprint(f" - {package['name']}: UPDATING ...")
50
51 try:
52 update_process = await check_subprocess('env', f"UPDATE_NIX_ATTR_PATH={package['attrPath']}", *update_script_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree)
53 update_info = await update_process.stdout.read()
54
55 await merge_changes(merge_lock, package, update_info, temp_dir)
56 except KeyboardInterrupt as e:
57 eprint('Cancelling…')
58 raise asyncio.exceptions.CancelledError()
59 except CalledProcessError as e:
60 eprint(f" - {package['name']}: ERROR")
61 eprint()
62 eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
63 eprint()
64 stderr = await e.process.stderr.read()
65 eprint(stderr.decode('utf-8'))
66 with open(f"{package['pname']}.log", 'wb') as logfile:
67 logfile.write(stderr)
68 eprint()
69 eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
70
71 if not keep_going:
72 raise asyncio.exceptions.CancelledError()
73
74@contextlib.contextmanager
75def make_worktree() -> Generator[Tuple[str, str], None, None]:
76 with tempfile.TemporaryDirectory() as wt:
77 branch_name = f'update-{os.path.basename(wt)}'
78 target_directory = f'{wt}/nixpkgs'
79
80 subprocess.run(['git', 'worktree', 'add', '-b', branch_name, target_directory])
81 yield (target_directory, branch_name)
82 subprocess.run(['git', 'worktree', 'remove', '--force', target_directory])
83 subprocess.run(['git', 'branch', '-D', branch_name])
84
85async def commit_changes(name: str, merge_lock: asyncio.Lock, worktree: str, branch: str, changes: List[Dict]) -> None:
86 for change in changes:
87 # Git can only handle a single index operation at a time
88 async with merge_lock:
89 await check_subprocess('git', 'add', *change['files'], cwd=worktree)
90 commit_message = '{attrPath}: {oldVersion} → {newVersion}'.format(**change)
91 await check_subprocess('git', 'commit', '--quiet', '-m', commit_message, cwd=worktree)
92 await check_subprocess('git', 'cherry-pick', branch)
93
94async def check_changes(package: Dict, worktree: str, update_info: str):
95 if 'commit' in package['supportedFeatures']:
96 changes = json.loads(update_info)
97 else:
98 changes = [{}]
99
100 # Try to fill in missing attributes when there is just a single change.
101 if len(changes) == 1:
102 # Dynamic data from updater take precedence over static data from passthru.updateScript.
103 if 'attrPath' not in changes[0]:
104 # update.nix is always passing attrPath
105 changes[0]['attrPath'] = package['attrPath']
106
107 if 'oldVersion' not in changes[0]:
108 # update.nix is always passing oldVersion
109 changes[0]['oldVersion'] = package['oldVersion']
110
111 if 'newVersion' not in changes[0]:
112 attr_path = changes[0]['attrPath']
113 obtain_new_version_process = await check_subprocess('nix-instantiate', '--expr', f'with import ./. {{}}; lib.getVersion {attr_path}', '--eval', '--strict', '--json', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree)
114 changes[0]['newVersion'] = json.loads((await obtain_new_version_process.stdout.read()).decode('utf-8'))
115
116 if 'files' not in changes[0]:
117 changed_files_process = await check_subprocess('git', 'diff', '--name-only', 'HEAD', stdout=asyncio.subprocess.PIPE, cwd=worktree)
118 changed_files = (await changed_files_process.stdout.read()).splitlines()
119 changes[0]['files'] = changed_files
120
121 if len(changed_files) == 0:
122 return []
123
124 return changes
125
126async def merge_changes(merge_lock: asyncio.Lock, package: Dict, update_info: str, temp_dir: Optional[Tuple[str, str]]) -> None:
127 if temp_dir is not None:
128 worktree, branch = temp_dir
129 changes = await check_changes(package, worktree, update_info)
130
131 if len(changes) > 0:
132 await commit_changes(package['name'], merge_lock, worktree, branch, changes)
133 else:
134 eprint(f" - {package['name']}: DONE, no changes.")
135 else:
136 eprint(f" - {package['name']}: DONE.")
137
138async def updater(nixpkgs_root: str, temp_dir: Optional[Tuple[str, str]], merge_lock: asyncio.Lock, packages_to_update: asyncio.Queue[Optional[Dict]], keep_going: bool, commit: bool):
139 while True:
140 package = await packages_to_update.get()
141 if package is None:
142 # A sentinel received, we are done.
143 return
144
145 if not ('commit' in package['supportedFeatures'] or 'attrPath' in package):
146 temp_dir = None
147
148 await run_update_script(nixpkgs_root, merge_lock, temp_dir, package, keep_going)
149
150async def start_updates(max_workers: int, keep_going: bool, commit: bool, packages: List[Dict]):
151 merge_lock = asyncio.Lock()
152 packages_to_update: asyncio.Queue[Optional[Dict]] = asyncio.Queue()
153
154 with contextlib.ExitStack() as stack:
155 temp_dirs: List[Optional[Tuple[str, str]]] = []
156
157 # Do not create more workers than there are packages.
158 num_workers = min(max_workers, len(packages))
159
160 nixpkgs_root_process = await check_subprocess('git', 'rev-parse', '--show-toplevel', stdout=asyncio.subprocess.PIPE)
161 nixpkgs_root = (await nixpkgs_root_process.stdout.read()).decode('utf-8').strip()
162
163 # Set up temporary directories when using auto-commit.
164 for i in range(num_workers):
165 temp_dir = stack.enter_context(make_worktree()) if commit else None
166 temp_dirs.append(temp_dir)
167
168 # Fill up an update queue,
169 for package in packages:
170 await packages_to_update.put(package)
171
172 # Add sentinels, one for each worker.
173 # A workers will terminate when it gets sentinel from the queue.
174 for i in range(num_workers):
175 await packages_to_update.put(None)
176
177 # Prepare updater workers for each temp_dir directory.
178 # At most `num_workers` instances of `run_update_script` will be running at one time.
179 updaters = asyncio.gather(*[updater(nixpkgs_root, temp_dir, merge_lock, packages_to_update, keep_going, commit) for temp_dir in temp_dirs])
180
181 try:
182 # Start updater workers.
183 await updaters
184 except asyncio.exceptions.CancelledError as e:
185 # When one worker is cancelled, cancel the others too.
186 updaters.cancel()
187
188def main(max_workers: int, keep_going: bool, commit: bool, packages_path: str) -> None:
189 with open(packages_path) as f:
190 packages = json.load(f)
191
192 eprint()
193 eprint('Going to be running update for following packages:')
194 for package in packages:
195 eprint(f" - {package['name']}")
196 eprint()
197
198 confirm = input('Press Enter key to continue...')
199 if confirm == '':
200 eprint()
201 eprint('Running update for:')
202
203 asyncio.run(start_updates(max_workers, keep_going, commit, packages))
204
205 eprint()
206 eprint('Packages updated!')
207 sys.exit()
208 else:
209 eprint('Aborting!')
210 sys.exit(130)
211
212parser = argparse.ArgumentParser(description='Update packages')
213parser.add_argument('--max-workers', '-j', dest='max_workers', type=int, help='Number of updates to run concurrently', nargs='?', default=4)
214parser.add_argument('--keep-going', '-k', dest='keep_going', action='store_true', help='Do not stop after first failure')
215parser.add_argument('--commit', '-c', dest='commit', action='store_true', help='Commit the changes')
216parser.add_argument('packages', help='JSON file containing the list of package names and their update scripts')
217
218if __name__ == '__main__':
219 args = parser.parse_args()
220
221 try:
222 main(args.max_workers, args.keep_going, args.commit, args.packages)
223 except KeyboardInterrupt as e:
224 # Let’s cancel outside of the main loop too.
225 sys.exit(130)