at 18.09-beta 10 kB view raw
1#! /usr/bin/env nix-shell 2#! nix-shell -i python3 -p "python3.withPackages(ps: with ps; [ packaging requests toolz ])" -p git 3 4""" 5Update a Python package expression by passing in the `.nix` file, or the directory containing it. 6You can pass in multiple files or paths. 7 8You'll likely want to use 9`` 10 $ ./update-python-libraries ../../pkgs/development/python-modules/* 11`` 12to update all libraries in that folder. 13""" 14 15import argparse 16import logging 17import os 18import re 19import requests 20import toolz 21from concurrent.futures import ThreadPoolExecutor as Pool 22from packaging.version import Version as _Version 23from packaging.version import InvalidVersion 24from packaging.specifiers import SpecifierSet 25import collections 26import subprocess 27 28INDEX = "https://pypi.io/pypi" 29"""url of PyPI""" 30 31EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl'] 32"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned.""" 33 34PRERELEASES = False 35 36import logging 37logging.basicConfig(level=logging.INFO) 38 39 40class Version(_Version, collections.abc.Sequence): 41 42 def __init__(self, version): 43 super().__init__(version) 44 # We cannot use `str(Version(0.04.21))` because that becomes `0.4.21` 45 # https://github.com/avian2/unidecode/issues/13#issuecomment-354538882 46 self.raw_version = version 47 48 def __getitem__(self, i): 49 return self._version.release[i] 50 51 def __len__(self): 52 return len(self._version.release) 53 54 def __iter__(self): 55 yield from self._version.release 56 57 58def _get_values(attribute, text): 59 """Match attribute in text and return all matches. 60 61 :returns: List of matches. 62 """ 63 regex = '{}\s+=\s+"(.*)";'.format(attribute) 64 regex = re.compile(regex) 65 values = regex.findall(text) 66 return values 67 68def _get_unique_value(attribute, text): 69 """Match attribute in text and return unique match. 70 71 :returns: Single match. 72 """ 73 values = _get_values(attribute, text) 74 n = len(values) 75 if n > 1: 76 raise ValueError("found too many values for {}".format(attribute)) 77 elif n == 1: 78 return values[0] 79 else: 80 raise ValueError("no value found for {}".format(attribute)) 81 82def _get_line_and_value(attribute, text): 83 """Match attribute in text. Return the line and the value of the attribute.""" 84 regex = '({}\s+=\s+"(.*)";)'.format(attribute) 85 regex = re.compile(regex) 86 value = regex.findall(text) 87 n = len(value) 88 if n > 1: 89 raise ValueError("found too many values for {}".format(attribute)) 90 elif n == 1: 91 return value[0] 92 else: 93 raise ValueError("no value found for {}".format(attribute)) 94 95 96def _replace_value(attribute, value, text): 97 """Search and replace value of attribute in text.""" 98 old_line, old_value = _get_line_and_value(attribute, text) 99 new_line = old_line.replace(old_value, value) 100 new_text = text.replace(old_line, new_line) 101 return new_text 102 103def _fetch_page(url): 104 r = requests.get(url) 105 if r.status_code == requests.codes.ok: 106 return r.json() 107 else: 108 raise ValueError("request for {} failed".format(url)) 109 110 111SEMVER = { 112 'major' : 0, 113 'minor' : 1, 114 'patch' : 2, 115} 116 117 118def _determine_latest_version(current_version, target, versions): 119 """Determine latest version, given `target`. 120 """ 121 current_version = Version(current_version) 122 123 def _parse_versions(versions): 124 for v in versions: 125 try: 126 yield Version(v) 127 except InvalidVersion: 128 pass 129 130 versions = _parse_versions(versions) 131 132 index = SEMVER[target] 133 134 ceiling = list(current_version[0:index]) 135 if len(ceiling) == 0: 136 ceiling = None 137 else: 138 ceiling[-1]+=1 139 ceiling = Version(".".join(map(str, ceiling))) 140 141 # We do not want prereleases 142 versions = SpecifierSet(prereleases=PRERELEASES).filter(versions) 143 144 if ceiling is not None: 145 versions = SpecifierSet(f"<{ceiling}").filter(versions) 146 147 return (max(sorted(versions))).raw_version 148 149 150def _get_latest_version_pypi(package, extension, current_version, target): 151 """Get latest version and hash from PyPI.""" 152 url = "{}/{}/json".format(INDEX, package) 153 json = _fetch_page(url) 154 155 versions = json['releases'].keys() 156 version = _determine_latest_version(current_version, target, versions) 157 158 try: 159 releases = json['releases'][version] 160 except KeyError as e: 161 raise KeyError('Could not find version {} for {}'.format(version, package)) from e 162 for release in releases: 163 if release['filename'].endswith(extension): 164 # TODO: In case of wheel we need to do further checks! 165 sha256 = release['digests']['sha256'] 166 break 167 else: 168 sha256 = None 169 return version, sha256 170 171 172def _get_latest_version_github(package, extension, current_version, target): 173 raise ValueError("updating from GitHub is not yet supported.") 174 175 176FETCHERS = { 177 'fetchFromGitHub' : _get_latest_version_github, 178 'fetchPypi' : _get_latest_version_pypi, 179 'fetchurl' : _get_latest_version_pypi, 180} 181 182 183DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz' 184 185 186FORMATS = { 187 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION, 188 'wheel' : 'whl' 189} 190 191def _determine_fetcher(text): 192 # Count occurences of fetchers. 193 nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys()) 194 if nfetchers == 0: 195 raise ValueError("no fetcher.") 196 elif nfetchers > 1: 197 raise ValueError("multiple fetchers.") 198 else: 199 # Then we check which fetcher to use. 200 for fetcher in FETCHERS.keys(): 201 if 'src = {}'.format(fetcher) in text: 202 return fetcher 203 204 205def _determine_extension(text, fetcher): 206 """Determine what extension is used in the expression. 207 208 If we use: 209 - fetchPypi, we check if format is specified. 210 - fetchurl, we determine the extension from the url. 211 - fetchFromGitHub we simply use `.tar.gz`. 212 """ 213 if fetcher == 'fetchPypi': 214 try: 215 src_format = _get_unique_value('format', text) 216 except ValueError as e: 217 src_format = None # format was not given 218 219 try: 220 extension = _get_unique_value('extension', text) 221 except ValueError as e: 222 extension = None # extension was not given 223 224 if extension is None: 225 if src_format is None: 226 src_format = 'setuptools' 227 elif src_format == 'flit': 228 raise ValueError("Don't know how to update a Flit package.") 229 extension = FORMATS[src_format] 230 231 elif fetcher == 'fetchurl': 232 url = _get_unique_value('url', text) 233 extension = os.path.splitext(url)[1] 234 if 'pypi' not in url: 235 raise ValueError('url does not point to PyPI.') 236 237 elif fetcher == 'fetchFromGitHub': 238 raise ValueError('updating from GitHub is not yet implemented.') 239 240 return extension 241 242 243def _update_package(path, target): 244 245 # Read the expression 246 with open(path, 'r') as f: 247 text = f.read() 248 249 # Determine pname. 250 pname = _get_unique_value('pname', text) 251 252 # Determine version. 253 version = _get_unique_value('version', text) 254 255 # First we check how many fetchers are mentioned. 256 fetcher = _determine_fetcher(text) 257 258 extension = _determine_extension(text, fetcher) 259 260 new_version, new_sha256 = FETCHERS[fetcher](pname, extension, version, target) 261 262 if new_version == version: 263 logging.info("Path {}: no update available for {}.".format(path, pname)) 264 return False 265 elif Version(new_version) <= Version(version): 266 raise ValueError("downgrade for {}.".format(pname)) 267 if not new_sha256: 268 raise ValueError("no file available for {}.".format(pname)) 269 270 text = _replace_value('version', new_version, text) 271 text = _replace_value('sha256', new_sha256, text) 272 273 with open(path, 'w') as f: 274 f.write(text) 275 276 logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version)) 277 278 result = { 279 'path' : path, 280 'target': target, 281 'pname': pname, 282 'old_version' : version, 283 'new_version' : new_version, 284 #'fetcher' : fetcher, 285 } 286 287 return result 288 289 290def _update(path, target): 291 292 # We need to read and modify a Nix expression. 293 if os.path.isdir(path): 294 path = os.path.join(path, 'default.nix') 295 296 # If a default.nix does not exist, we quit. 297 if not os.path.isfile(path): 298 logging.info("Path {}: does not exist.".format(path)) 299 return False 300 301 # If file is not a Nix expression, we quit. 302 if not path.endswith(".nix"): 303 logging.info("Path {}: does not end with `.nix`.".format(path)) 304 return False 305 306 try: 307 return _update_package(path, target) 308 except ValueError as e: 309 logging.warning("Path {}: {}".format(path, e)) 310 return False 311 312 313def _commit(path, pname, old_version, new_version, **kwargs): 314 """Commit result. 315 """ 316 317 msg = f'python: {pname}: {old_version} -> {new_version}' 318 319 try: 320 subprocess.check_call(['git', 'add', path]) 321 subprocess.check_call(['git', 'commit', '-m', msg]) 322 except subprocess.CalledProcessError as e: 323 subprocess.check_call(['git', 'checkout', path]) 324 raise subprocess.CalledProcessError(f'Could not commit {path}') from e 325 326 return True 327 328 329def main(): 330 331 parser = argparse.ArgumentParser() 332 parser.add_argument('package', type=str, nargs='+') 333 parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major') 334 parser.add_argument('--commit', action='store_true', help='Create a commit for each package update') 335 336 args = parser.parse_args() 337 target = args.target 338 339 packages = list(map(os.path.abspath, args.package)) 340 341 logging.info("Updating packages...") 342 343 # Use threads to update packages concurrently 344 with Pool() as p: 345 results = list(p.map(lambda pkg: _update(pkg, target), packages)) 346 347 logging.info("Finished updating packages.") 348 349 # Commits are created sequentially. 350 if args.commit: 351 logging.info("Committing updates...") 352 list(map(lambda x: _commit(**x), filter(bool, results))) 353 logging.info("Finished committing updates") 354 355 count = sum(map(bool, results)) 356 logging.info("{} package(s) updated".format(count)) 357 358 359 360if __name__ == '__main__': 361 main()