1#! /usr/bin/env nix-shell
2#! nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ packaging requests toolz ])" -p git
3
4"""
5Update a Python package expression by passing in the `.nix` file, or the directory containing it.
6You can pass in multiple files or paths.
7
8You'll likely want to use
9``
10 $ ./update-python-libraries ../../pkgs/development/python-modules/*
11``
12to update all libraries in that folder.
13"""
14
15import argparse
16import logging
17import os
18import re
19import requests
20import toolz
21from concurrent.futures import ThreadPoolExecutor as Pool
22from packaging.version import Version as _Version
23from packaging.version import InvalidVersion
24from packaging.specifiers import SpecifierSet
25import collections
26import subprocess
27
28INDEX = "https://pypi.io/pypi"
29"""url of PyPI"""
30
31EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl']
32"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned."""
33
34PRERELEASES = False
35
36import logging
37logging.basicConfig(level=logging.INFO)
38
39
40class Version(_Version, collections.abc.Sequence):
41
42 def __init__(self, version):
43 super().__init__(version)
44 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21`
45 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882
46 self.raw_version = version
47
48 def __getitem__(self, i):
49 return self._version.release[i]
50
51 def __len__(self):
52 return len(self._version.release)
53
54 def __iter__(self):
55 yield from self._version.release
56
57
58def _get_values(attribute, text):
59 """Match attribute in text and return all matches.
60
61 :returns: List of matches.
62 """
63 regex = '{}\s+=\s+"(.*)";'.format(attribute)
64 regex = re.compile(regex)
65 values = regex.findall(text)
66 return values
67
68def _get_unique_value(attribute, text):
69 """Match attribute in text and return unique match.
70
71 :returns: Single match.
72 """
73 values = _get_values(attribute, text)
74 n = len(values)
75 if n > 1:
76 raise ValueError("found too many values for {}".format(attribute))
77 elif n == 1:
78 return values[0]
79 else:
80 raise ValueError("no value found for {}".format(attribute))
81
82def _get_line_and_value(attribute, text):
83 """Match attribute in text. Return the line and the value of the attribute."""
84 regex = '({}\s+=\s+"(.*)";)'.format(attribute)
85 regex = re.compile(regex)
86 value = regex.findall(text)
87 n = len(value)
88 if n > 1:
89 raise ValueError("found too many values for {}".format(attribute))
90 elif n == 1:
91 return value[0]
92 else:
93 raise ValueError("no value found for {}".format(attribute))
94
95
96def _replace_value(attribute, value, text):
97 """Search and replace value of attribute in text."""
98 old_line, old_value = _get_line_and_value(attribute, text)
99 new_line = old_line.replace(old_value, value)
100 new_text = text.replace(old_line, new_line)
101 return new_text
102
103def _fetch_page(url):
104 r = requests.get(url)
105 if r.status_code == requests.codes.ok:
106 return r.json()
107 else:
108 raise ValueError("request for {} failed".format(url))
109
110
111SEMVER = {
112 'major' : 0,
113 'minor' : 1,
114 'patch' : 2,
115}
116
117
118def _determine_latest_version(current_version, target, versions):
119 """Determine latest version, given `target`.
120 """
121 current_version = Version(current_version)
122
123 def _parse_versions(versions):
124 for v in versions:
125 try:
126 yield Version(v)
127 except InvalidVersion:
128 pass
129
130 versions = _parse_versions(versions)
131
132 index = SEMVER[target]
133
134 ceiling = list(current_version[0:index])
135 if len(ceiling) == 0:
136 ceiling = None
137 else:
138 ceiling[-1]+=1
139 ceiling = Version(".".join(map(str, ceiling)))
140
141 # We do not want prereleases
142 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions)
143
144 if ceiling is not None:
145 versions = SpecifierSet(f"<{ceiling}").filter(versions)
146
147 return (max(sorted(versions))).raw_version
148
149
150def _get_latest_version_pypi(package, extension, current_version, target):
151 """Get latest version and hash from PyPI."""
152 url = "{}/{}/json".format(INDEX, package)
153 json = _fetch_page(url)
154
155 versions = json['releases'].keys()
156 version = _determine_latest_version(current_version, target, versions)
157
158 try:
159 releases = json['releases'][version]
160 except KeyError as e:
161 raise KeyError('Could not find version {} for {}'.format(version, package)) from e
162 for release in releases:
163 if release['filename'].endswith(extension):
164 # TODO: In case of wheel we need to do further checks!
165 sha256 = release['digests']['sha256']
166 break
167 else:
168 sha256 = None
169 return version, sha256
170
171
172def _get_latest_version_github(package, extension, current_version, target):
173 raise ValueError("updating from GitHub is not yet supported.")
174
175
176FETCHERS = {
177 'fetchFromGitHub' : _get_latest_version_github,
178 'fetchPypi' : _get_latest_version_pypi,
179 'fetchurl' : _get_latest_version_pypi,
180}
181
182
183DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz'
184
185
186FORMATS = {
187 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION,
188 'wheel' : 'whl'
189}
190
191def _determine_fetcher(text):
192 # Count occurences of fetchers.
193 nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys())
194 if nfetchers == 0:
195 raise ValueError("no fetcher.")
196 elif nfetchers > 1:
197 raise ValueError("multiple fetchers.")
198 else:
199 # Then we check which fetcher to use.
200 for fetcher in FETCHERS.keys():
201 if 'src = {}'.format(fetcher) in text:
202 return fetcher
203
204
205def _determine_extension(text, fetcher):
206 """Determine what extension is used in the expression.
207
208 If we use:
209 - fetchPypi, we check if format is specified.
210 - fetchurl, we determine the extension from the url.
211 - fetchFromGitHub we simply use `.tar.gz`.
212 """
213 if fetcher == 'fetchPypi':
214 try:
215 src_format = _get_unique_value('format', text)
216 except ValueError as e:
217 src_format = None # format was not given
218
219 try:
220 extension = _get_unique_value('extension', text)
221 except ValueError as e:
222 extension = None # extension was not given
223
224 if extension is None:
225 if src_format is None:
226 src_format = 'setuptools'
227 elif src_format == 'flit':
228 raise ValueError("Don't know how to update a Flit package.")
229 extension = FORMATS[src_format]
230
231 elif fetcher == 'fetchurl':
232 url = _get_unique_value('url', text)
233 extension = os.path.splitext(url)[1]
234 if 'pypi' not in url:
235 raise ValueError('url does not point to PyPI.')
236
237 elif fetcher == 'fetchFromGitHub':
238 raise ValueError('updating from GitHub is not yet implemented.')
239
240 return extension
241
242
243def _update_package(path, target):
244
245 # Read the expression
246 with open(path, 'r') as f:
247 text = f.read()
248
249 # Determine pname.
250 pname = _get_unique_value('pname', text)
251
252 # Determine version.
253 version = _get_unique_value('version', text)
254
255 # First we check how many fetchers are mentioned.
256 fetcher = _determine_fetcher(text)
257
258 extension = _determine_extension(text, fetcher)
259
260 new_version, new_sha256 = FETCHERS[fetcher](pname, extension, version, target)
261
262 if new_version == version:
263 logging.info("Path {}: no update available for {}.".format(path, pname))
264 return False
265 elif Version(new_version) <= Version(version):
266 raise ValueError("downgrade for {}.".format(pname))
267 if not new_sha256:
268 raise ValueError("no file available for {}.".format(pname))
269
270 text = _replace_value('version', new_version, text)
271 text = _replace_value('sha256', new_sha256, text)
272
273 with open(path, 'w') as f:
274 f.write(text)
275
276 logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version))
277
278 result = {
279 'path' : path,
280 'target': target,
281 'pname': pname,
282 'old_version' : version,
283 'new_version' : new_version,
284 #'fetcher' : fetcher,
285 }
286
287 return result
288
289
290def _update(path, target):
291
292 # We need to read and modify a Nix expression.
293 if os.path.isdir(path):
294 path = os.path.join(path, 'default.nix')
295
296 # If a default.nix does not exist, we quit.
297 if not os.path.isfile(path):
298 logging.info("Path {}: does not exist.".format(path))
299 return False
300
301 # If file is not a Nix expression, we quit.
302 if not path.endswith(".nix"):
303 logging.info("Path {}: does not end with `.nix`.".format(path))
304 return False
305
306 try:
307 return _update_package(path, target)
308 except ValueError as e:
309 logging.warning("Path {}: {}".format(path, e))
310 return False
311
312
313def _commit(path, pname, old_version, new_version, **kwargs):
314 """Commit result.
315 """
316
317 msg = f'python: {pname}: {old_version} -> {new_version}'
318
319 try:
320 subprocess.check_call(['git', 'add', path])
321 subprocess.check_call(['git', 'commit', '-m', msg])
322 except subprocess.CalledProcessError as e:
323 subprocess.check_call(['git', 'checkout', path])
324 raise subprocess.CalledProcessError(f'Could not commit {path}') from e
325
326 return True
327
328
329def main():
330
331 parser = argparse.ArgumentParser()
332 parser.add_argument('package', type=str, nargs='+')
333 parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major')
334 parser.add_argument('--commit', action='store_true', help='Create a commit for each package update')
335
336 args = parser.parse_args()
337 target = args.target
338
339 packages = list(map(os.path.abspath, args.package))
340
341 logging.info("Updating packages...")
342
343 # Use threads to update packages concurrently
344 with Pool() as p:
345 results = list(p.map(lambda pkg: _update(pkg, target), packages))
346
347 logging.info("Finished updating packages.")
348
349 # Commits are created sequentially.
350 if args.commit:
351 logging.info("Committing updates...")
352 list(map(lambda x: _commit(**x), filter(bool, results)))
353 logging.info("Finished committing updates")
354
355 count = sum(map(bool, results))
356 logging.info("{} package(s) updated".format(count))
357
358
359
360if __name__ == '__main__':
361 main()