1# python library used to update plugins:
2# - pkgs/applications/editors/vim/plugins/update.py
3# - pkgs/applications/editors/kakoune/plugins/update.py
4# - pkgs/development/lua-modules/updater/updater.py
5
6# format:
7# $ nix run nixpkgs#ruff maintainers/scripts/pluginupdate.py
8# type-check:
9# $ nix run nixpkgs#python3.pkgs.mypy maintainers/scripts/pluginupdate.py
10# linted:
11# $ nix run nixpkgs#python3.pkgs.flake8 -- --ignore E501,E265 maintainers/scripts/pluginupdate.py
12
13import argparse
14import csv
15import functools
16import http
17import json
18import logging
19import os
20import re
21import subprocess
22import sys
23import time
24import traceback
25import urllib.error
26import urllib.parse
27import urllib.request
28import xml.etree.ElementTree as ET
29from dataclasses import asdict, dataclass
30from datetime import UTC, datetime
31from functools import wraps
32from multiprocessing.dummy import Pool
33from pathlib import Path
34from tempfile import NamedTemporaryFile
35from typing import Any, Callable
36from urllib.parse import urljoin, urlparse
37
38import git
39
40ATOM_ENTRY = "{http://www.w3.org/2005/Atom}entry" # " vim gets confused here
41ATOM_LINK = "{http://www.w3.org/2005/Atom}link" # "
42ATOM_UPDATED = "{http://www.w3.org/2005/Atom}updated" # "
43
44LOG_LEVELS = {
45 logging.getLevelName(level): level
46 for level in [logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR]
47}
48
49log = logging.getLogger()
50
51
52def retry(ExceptionToCheck: Any, tries: int = 4, delay: float = 3, backoff: float = 2):
53 """Retry calling the decorated function using an exponential backoff.
54 http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
55 original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
56 (BSD licensed)
57 :param ExceptionToCheck: the exception on which to retry
58 :param tries: number of times to try (not retry) before giving up
59 :param delay: initial delay between retries in seconds
60 :param backoff: backoff multiplier e.g. value of 2 will double the delay
61 each retry
62 """
63
64 def deco_retry(f: Callable) -> Callable:
65 @wraps(f)
66 def f_retry(*args: Any, **kwargs: Any) -> Any:
67 mtries, mdelay = tries, delay
68 while mtries > 1:
69 try:
70 return f(*args, **kwargs)
71 except ExceptionToCheck as e:
72 print(f"{str(e)}, Retrying in {mdelay} seconds...")
73 time.sleep(mdelay)
74 mtries -= 1
75 mdelay *= backoff
76 return f(*args, **kwargs)
77
78 return f_retry # true decorator
79
80 return deco_retry
81
82
83@dataclass
84class FetchConfig:
85 proc: int
86 github_token: str
87
88
89def make_request(url: str, token=None) -> urllib.request.Request:
90 headers = {}
91 if token is not None:
92 headers["Authorization"] = f"token {token}"
93 return urllib.request.Request(url, headers=headers)
94
95
96# a dictionary of plugins and their new repositories
97Redirects = dict["PluginDesc", "Repo"]
98
99
100class Repo:
101 def __init__(self, uri: str, branch: str) -> None:
102 self.uri = uri
103 """Url to the repo"""
104 self._branch = branch
105 # Redirect is the new Repo to use
106 self.redirect: "Repo | None" = None
107 self.token = "dummy_token"
108
109 @property
110 def name(self):
111 return self.uri.strip("/").split("/")[-1]
112
113 @property
114 def branch(self):
115 return self._branch or "HEAD"
116
117 def __str__(self) -> str:
118 return f"{self.uri}"
119
120 def __repr__(self) -> str:
121 return f"Repo({self.name}, {self.uri})"
122
123 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
124 def has_submodules(self) -> bool:
125 return True
126
127 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
128 def latest_commit(self) -> tuple[str, datetime]:
129 log.debug("Latest commit")
130 loaded = self._prefetch(None)
131 updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z")
132
133 return loaded["rev"], updated
134
135 def _prefetch(self, ref: str | None):
136 cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
137 if ref is not None:
138 cmd.append(ref)
139 log.debug(cmd)
140 data = subprocess.check_output(cmd)
141 loaded = json.loads(data)
142 return loaded
143
144 def prefetch(self, ref: str | None) -> str:
145 log.info("Prefetching %s", self.uri)
146 loaded = self._prefetch(ref)
147 return loaded["sha256"]
148
149 def as_nix(self, plugin: "Plugin") -> str:
150 return f"""fetchgit {{
151 url = "{self.uri}";
152 rev = "{plugin.commit}";
153 sha256 = "{plugin.sha256}";
154 }}"""
155
156
157class RepoGitHub(Repo):
158 def __init__(self, owner: str, repo: str, branch: str) -> None:
159 self.owner = owner
160 self.repo = repo
161 self.token = None
162 """Url to the repo"""
163 super().__init__(self.url(""), branch)
164 log.debug(
165 "Instantiating github repo owner=%s and repo=%s", self.owner, self.repo
166 )
167
168 @property
169 def name(self):
170 return self.repo
171
172 def url(self, path: str) -> str:
173 res = urljoin(f"https://github.com/{self.owner}/{self.repo}/", path)
174 return res
175
176 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
177 def has_submodules(self) -> bool:
178 try:
179 req = make_request(self.url(f"blob/{self.branch}/.gitmodules"), self.token)
180 urllib.request.urlopen(req, timeout=10).close()
181 except urllib.error.HTTPError as e:
182 if e.code == 404:
183 return False
184 else:
185 raise
186 return True
187
188 @retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
189 def latest_commit(self) -> tuple[str, datetime]:
190 commit_url = self.url(f"commits/{self.branch}.atom")
191 log.debug("Sending request to %s", commit_url)
192 commit_req = make_request(commit_url, self.token)
193 with urllib.request.urlopen(commit_req, timeout=10) as req:
194 self._check_for_redirect(commit_url, req)
195 xml = req.read()
196
197 # Filter out illegal XML characters
198 illegal_xml_regex = re.compile(b"[\x00-\x08\x0b-\x0c\x0e-\x1f\x7f]")
199 xml = illegal_xml_regex.sub(b"", xml)
200
201 root = ET.fromstring(xml)
202 latest_entry = root.find(ATOM_ENTRY)
203 assert latest_entry is not None, f"No commits found in repository {self}"
204 commit_link = latest_entry.find(ATOM_LINK)
205 assert commit_link is not None, f"No link tag found feed entry {xml}"
206 url = urlparse(commit_link.get("href"))
207 updated_tag = latest_entry.find(ATOM_UPDATED)
208 assert (
209 updated_tag is not None and updated_tag.text is not None
210 ), f"No updated tag found feed entry {xml}"
211 updated = datetime.strptime(updated_tag.text, "%Y-%m-%dT%H:%M:%SZ")
212 return Path(str(url.path)).name, updated
213
214 def _check_for_redirect(self, url: str, req: http.client.HTTPResponse):
215 response_url = req.geturl()
216 if url != response_url:
217 new_owner, new_name = (
218 urllib.parse.urlsplit(response_url).path.strip("/").split("/")[:2]
219 )
220
221 new_repo = RepoGitHub(owner=new_owner, repo=new_name, branch=self.branch)
222 self.redirect = new_repo
223
224 def prefetch(self, commit: str) -> str:
225 if self.has_submodules():
226 sha256 = super().prefetch(commit)
227 else:
228 sha256 = self.prefetch_github(commit)
229 return sha256
230
231 def prefetch_github(self, ref: str) -> str:
232 cmd = ["nix-prefetch-url", "--unpack", self.url(f"archive/{ref}.tar.gz")]
233 log.debug("Running %s", cmd)
234 data = subprocess.check_output(cmd)
235 return data.strip().decode("utf-8")
236
237 def as_nix(self, plugin: "Plugin") -> str:
238 if plugin.has_submodules:
239 submodule_attr = "\n fetchSubmodules = true;"
240 else:
241 submodule_attr = ""
242
243 return f"""fetchFromGitHub {{
244 owner = "{self.owner}";
245 repo = "{self.repo}";
246 rev = "{plugin.commit}";
247 sha256 = "{plugin.sha256}";{submodule_attr}
248 }}"""
249
250
251@dataclass(frozen=True)
252class PluginDesc:
253 repo: Repo
254 branch: str
255 alias: str | None
256
257 @property
258 def name(self):
259 return self.alias or self.repo.name
260
261 @staticmethod
262 def load_from_csv(config: FetchConfig, row: dict[str, str]) -> "PluginDesc":
263 log.debug("Loading row %s", row)
264 branch = row["branch"]
265 repo = make_repo(row["repo"], branch.strip())
266 repo.token = config.github_token
267 return PluginDesc(
268 repo,
269 branch.strip(),
270 # alias is usually an empty string
271 row["alias"] if row["alias"] else None,
272 )
273
274 @staticmethod
275 def load_from_string(config: FetchConfig, line: str) -> "PluginDesc":
276 branch = "HEAD"
277 alias = None
278 uri = line
279 if " as " in uri:
280 uri, alias = uri.split(" as ")
281 alias = alias.strip()
282 if "@" in uri:
283 uri, branch = uri.split("@")
284 repo = make_repo(uri.strip(), branch.strip())
285 repo.token = config.github_token
286 return PluginDesc(repo, branch.strip(), alias)
287
288
289@dataclass
290class Plugin:
291 name: str
292 commit: str
293 has_submodules: bool
294 sha256: str
295 date: datetime | None = None
296
297 @property
298 def normalized_name(self) -> str:
299 return self.name.replace(".", "-")
300
301 @property
302 def version(self) -> str:
303 assert self.date is not None
304 return self.date.strftime("%Y-%m-%d")
305
306 def as_json(self) -> dict[str, str]:
307 copy = self.__dict__.copy()
308 del copy["date"]
309 return copy
310
311
312def load_plugins_from_csv(
313 config: FetchConfig,
314 input_file: Path,
315) -> list[PluginDesc]:
316 log.debug("Load plugins from csv %s", input_file)
317 plugins = []
318 with open(input_file, newline="") as csvfile:
319 log.debug("Writing into %s", input_file)
320 reader = csv.DictReader(
321 csvfile,
322 )
323 for line in reader:
324 plugin = PluginDesc.load_from_csv(config, line)
325 plugins.append(plugin)
326
327 return plugins
328
329
330def run_nix_expr(expr, nixpkgs: str, **args):
331 """
332 :param expr nix expression to fetch current plugins
333 :param nixpkgs Path towards a nixpkgs checkout
334 """
335 with CleanEnvironment(nixpkgs) as nix_path:
336 cmd = [
337 "nix",
338 "eval",
339 "--extra-experimental-features",
340 "nix-command",
341 "--impure",
342 "--json",
343 "--expr",
344 expr,
345 "--nix-path",
346 nix_path,
347 ]
348 log.debug("Running command: %s", " ".join(cmd))
349 out = subprocess.check_output(cmd, **args)
350 data = json.loads(out)
351 return data
352
353
354class Editor:
355 """The configuration of the update script."""
356
357 def __init__(
358 self,
359 name: str,
360 root: Path,
361 get_plugins: str,
362 default_in: Path | None = None,
363 default_out: Path | None = None,
364 deprecated: Path | None = None,
365 cache_file: str | None = None,
366 ):
367 log.debug("get_plugins:", get_plugins)
368 self.name = name
369 self.root = root
370 self.get_plugins = get_plugins
371 self.default_in = default_in or root.joinpath(f"{name}-plugin-names")
372 self.default_out = default_out or root.joinpath("generated.nix")
373 self.deprecated = deprecated or root.joinpath("deprecated.json")
374 self.cache_file = cache_file or f"{name}-plugin-cache.json"
375 self.nixpkgs_repo = None
376
377 def add(self, args):
378 """CSV spec"""
379 log.debug("called the 'add' command")
380 fetch_config = FetchConfig(args.proc, args.github_token)
381 editor = self
382 for plugin_line in args.add_plugins:
383 log.debug("using plugin_line %s", plugin_line)
384 pdesc = PluginDesc.load_from_string(fetch_config, plugin_line)
385 log.debug("loaded as pdesc %s", pdesc)
386 append = [pdesc]
387 editor.rewrite_input(
388 fetch_config, args.input_file, editor.deprecated, append=append
389 )
390 plugin, _ = prefetch_plugin(pdesc)
391
392 if ( # lua updater doesn't support updating individual plugin
393 self.name != "lua"
394 ):
395 # update generated.nix
396 update = self.get_update(
397 args.input_file,
398 args.outfile,
399 fetch_config,
400 [plugin.normalized_name],
401 )
402 update()
403
404 autocommit = not args.no_commit
405 if autocommit:
406 commit(
407 editor.nixpkgs_repo,
408 "{drv_name}: init at {version}".format(
409 drv_name=editor.get_drv_name(plugin.normalized_name),
410 version=plugin.version,
411 ),
412 [args.outfile, args.input_file],
413 )
414
415 # Expects arguments generated by 'update' subparser
416 def update(self, args):
417 """CSV spec"""
418 print("the update member function should be overridden in subclasses")
419
420 def get_current_plugins(
421 self, config: FetchConfig, nixpkgs: str
422 ) -> list[tuple[PluginDesc, Plugin]]:
423 """To fill the cache"""
424 data = run_nix_expr(self.get_plugins, nixpkgs)
425 plugins = []
426 for name, attr in data.items():
427 checksum = attr["checksum"]
428
429 # https://github.com/NixOS/nixpkgs/blob/8a335419/pkgs/applications/editors/neovim/build-neovim-plugin.nix#L36
430 # https://github.com/NixOS/nixpkgs/pull/344478#discussion_r1786646055
431 version = re.search(r"\d\d\d\d-\d\d?-\d\d?", attr["version"])
432 if version is None:
433 raise ValueError(f"Cannot parse version: {attr['version']}")
434 date = datetime.strptime(version.group(), "%Y-%m-%d")
435
436 pdesc = PluginDesc.load_from_string(config, f'{attr["homePage"]} as {name}')
437 p = Plugin(
438 attr["pname"],
439 checksum["rev"],
440 checksum["submodules"],
441 checksum["sha256"],
442 date,
443 )
444
445 plugins.append((pdesc, p))
446 return plugins
447
448 def load_plugin_spec(self, config: FetchConfig, plugin_file) -> list[PluginDesc]:
449 """CSV spec"""
450 return load_plugins_from_csv(config, plugin_file)
451
452 def generate_nix(self, _plugins, _outfile: str):
453 """Returns nothing for now, writes directly to outfile"""
454 raise NotImplementedError()
455
456 def filter_plugins_to_update(
457 self, plugin: PluginDesc, to_update: list[str]
458 ) -> bool:
459 """Function for filtering out plugins, that user doesn't want to update.
460
461 It is mainly used for updating only specific plugins, not all of them.
462 By default it filters out plugins not present in `to_update`,
463 assuming `to_update` is a list of plugin names (the same as in the
464 result expression).
465
466 This function is never called if `to_update` is empty.
467 Feel free to override this function in derived classes.
468
469 Note:
470 Known bug: you have to use a deprecated name, instead of new one.
471 This is because we resolve deprecations later and can't get new
472 plugin URL before we request info about it.
473
474 Although, we could parse deprecated.json, but it's a whole bunch
475 of spaghetti code, which I don't want to write.
476
477 Arguments:
478 plugin: Plugin on which you decide whether to ignore or not.
479 to_update:
480 List of strings passed to via the `--update` command line parameter.
481 By default, we assume it is a list of URIs identical to what
482 is in the input file.
483
484 Returns:
485 True if we should update plugin and False if not.
486 """
487 return plugin.name.replace(".", "-") in to_update
488
489 def get_update(
490 self,
491 input_file: str,
492 output_file: str,
493 config: FetchConfig,
494 to_update: list[str] | None,
495 ):
496 if to_update is None:
497 to_update = []
498
499 current_plugins = self.get_current_plugins(config, self.nixpkgs)
500 current_plugin_specs = self.load_plugin_spec(config, input_file)
501
502 cache: Cache = Cache(
503 [plugin for _description, plugin in current_plugins], self.cache_file
504 )
505 _prefetch = functools.partial(prefetch, cache=cache)
506
507 plugins_to_update = (
508 current_plugin_specs
509 if len(to_update) == 0
510 else [
511 description
512 for description in current_plugin_specs
513 if self.filter_plugins_to_update(description, to_update)
514 ]
515 )
516
517 def update() -> Redirects:
518 if len(plugins_to_update) == 0:
519 log.error(
520 "\n\n\n\nIt seems like you provided some arguments to `--update`:\n"
521 + ", ".join(to_update)
522 + "\nBut after filtering, the result list of plugins is empty\n"
523 "\n"
524 "Are you sure you provided the same URIs as in your input file?\n"
525 "(" + str(input_file) + ")\n\n"
526 )
527 return {}
528
529 try:
530 pool = Pool(processes=config.proc)
531 results = pool.map(_prefetch, plugins_to_update)
532 finally:
533 cache.store()
534
535 print(f"{len(results)} of {len(current_plugins)} were checked")
536 # Do only partial update of out file
537 if len(results) != len(current_plugins):
538 results = self.merge_results(current_plugins, results)
539 plugins, redirects = check_results(results)
540
541 plugins = sorted(plugins, key=lambda v: v[1].normalized_name)
542 self.generate_nix(plugins, output_file)
543
544 return redirects
545
546 return update
547
548 def merge_results(
549 self,
550 current: list[tuple[PluginDesc, Plugin]],
551 fetched: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
552 ) -> list[tuple[PluginDesc, Exception | Plugin, Repo | None]]:
553 # transforming this to dict, so lookup is O(1) instead of O(n) (n is len(current))
554 result: dict[str, tuple[PluginDesc, Exception | Plugin, Repo | None]] = {
555 # also adding redirect (third item in the result tuple)
556 pl.normalized_name: (pdesc, pl, None)
557 for pdesc, pl in current
558 }
559
560 for plugin_desc, plugin, redirect in fetched:
561 result[plugin.normalized_name] = (plugin_desc, plugin, redirect)
562
563 return list(result.values())
564
565 @property
566 def attr_path(self):
567 return self.name + "Plugins"
568
569 def get_drv_name(self, name: str):
570 return self.attr_path + "." + name
571
572 def rewrite_input(self, *args, **kwargs):
573 return rewrite_input(*args, **kwargs)
574
575 def create_parser(self):
576 common = argparse.ArgumentParser(
577 add_help=False,
578 description=(
579 f"""
580 Updates nix derivations for {self.name} plugins.\n
581 By default from {self.default_in} to {self.default_out}"""
582 ),
583 )
584 common.add_argument(
585 "--nixpkgs",
586 type=str,
587 default=os.getcwd(),
588 help="Adjust log level",
589 )
590 common.add_argument(
591 "--input-names",
592 "-i",
593 dest="input_file",
594 type=Path,
595 default=self.default_in,
596 help="A list of plugins in the form owner/repo",
597 )
598 common.add_argument(
599 "--out",
600 "-o",
601 dest="outfile",
602 default=self.default_out,
603 type=Path,
604 help="Filename to save generated nix code",
605 )
606 common.add_argument(
607 "--proc",
608 "-p",
609 dest="proc",
610 type=int,
611 default=30,
612 help="Number of concurrent processes to spawn. Setting --github-token allows higher values.",
613 )
614 common.add_argument(
615 "--github-token",
616 "-t",
617 type=str,
618 default=os.getenv("GITHUB_API_TOKEN"),
619 help="""Allows to set --proc to higher values.
620 Uses GITHUB_API_TOKEN environment variables as the default value.""",
621 )
622 common.add_argument(
623 "--no-commit",
624 "-n",
625 action="store_true",
626 default=False,
627 help="Whether to autocommit changes",
628 )
629 common.add_argument(
630 "--debug",
631 "-d",
632 choices=LOG_LEVELS.keys(),
633 default=logging.getLevelName(logging.WARN),
634 help="Adjust log level",
635 )
636
637 main = argparse.ArgumentParser(
638 parents=[common],
639 description=(
640 f"""
641 Updates nix derivations for {self.name} plugins.\n
642 By default from {self.default_in} to {self.default_out}"""
643 ),
644 )
645
646 subparsers = main.add_subparsers(dest="command", required=False)
647 padd = subparsers.add_parser(
648 "add",
649 parents=[],
650 description="Add new plugin",
651 add_help=False,
652 )
653 padd.set_defaults(func=self.add)
654 padd.add_argument(
655 "add_plugins",
656 default=None,
657 nargs="+",
658 help=f"Plugin to add to {self.attr_path} from Github in the form owner/repo",
659 )
660
661 pupdate = subparsers.add_parser(
662 "update",
663 description="Update all or a subset of existing plugins",
664 add_help=False,
665 )
666 pupdate.add_argument(
667 "update_only",
668 default=None,
669 nargs="*",
670 help="Plugin URLs to update (must be the same as in the input file)",
671 )
672 pupdate.set_defaults(func=self.update)
673 return main
674
675 def run(
676 self,
677 ):
678 """
679 Convenience function
680 """
681 parser = self.create_parser()
682 args = parser.parse_args()
683 command = args.command or "update"
684 logging.basicConfig()
685 log.setLevel(LOG_LEVELS[args.debug])
686 log.info("Chose to run command: %s", command)
687 self.nixpkgs = args.nixpkgs
688
689 self.nixpkgs_repo = git.Repo(args.nixpkgs, search_parent_directories=True)
690
691 getattr(self, command)(args)
692
693
694class CleanEnvironment(object):
695 def __init__(self, nixpkgs):
696 self.local_pkgs = nixpkgs
697
698 def __enter__(self) -> str:
699 """
700 local_pkgs = str(Path(__file__).parent.parent.parent)
701 """
702 self.old_environ = os.environ.copy()
703 self.empty_config = NamedTemporaryFile()
704 self.empty_config.write(b"{}")
705 self.empty_config.flush()
706 return f"localpkgs={self.local_pkgs}"
707
708 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
709 os.environ.update(self.old_environ)
710 self.empty_config.close()
711
712
713def prefetch_plugin(
714 p: PluginDesc,
715 cache: "Cache | None" = None,
716) -> tuple[Plugin, Repo | None]:
717 commit = None
718 log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}")
719 commit, date = p.repo.latest_commit()
720
721 cached_plugin = cache[commit] if cache else None
722 if cached_plugin is not None:
723 log.debug(f"Cache hit for {p.name}!")
724 cached_plugin.name = p.name
725 cached_plugin.date = date
726 return cached_plugin, p.repo.redirect
727
728 has_submodules = p.repo.has_submodules()
729 log.debug(f"prefetch {p.name}")
730 sha256 = p.repo.prefetch(commit)
731
732 return (
733 Plugin(p.name, commit, has_submodules, sha256, date=date),
734 p.repo.redirect,
735 )
736
737
738def print_download_error(plugin: PluginDesc, ex: Exception):
739 print(f"{plugin}: {ex}", file=sys.stderr)
740 ex_traceback = ex.__traceback__
741 tb_lines = [
742 line.rstrip("\n")
743 for line in traceback.format_exception(ex.__class__, ex, ex_traceback)
744 ]
745 print("\n".join(tb_lines))
746
747
748def check_results(
749 results: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
750) -> tuple[list[tuple[PluginDesc, Plugin]], Redirects]:
751 """ """
752 failures: list[tuple[PluginDesc, Exception]] = []
753 plugins = []
754 redirects: Redirects = {}
755 for pdesc, result, redirect in results:
756 if isinstance(result, Exception):
757 failures.append((pdesc, result))
758 else:
759 new_pdesc = pdesc
760 if redirect is not None:
761 redirects.update({pdesc: redirect})
762 new_pdesc = PluginDesc(redirect, pdesc.branch, pdesc.alias)
763 plugins.append((new_pdesc, result))
764
765 if len(failures) == 0:
766 return plugins, redirects
767 else:
768 log.error(f"{len(failures)} plugin(s) could not be downloaded:\n")
769
770 for plugin, exception in failures:
771 print_download_error(plugin, exception)
772
773 sys.exit(1)
774
775
776def make_repo(uri: str, branch) -> Repo:
777 """Instantiate a Repo with the correct specialization depending on server (gitub spec)"""
778 # dumb check to see if it's of the form owner/repo (=> github) or https://...
779 res = urlparse(uri)
780 if res.netloc in ["github.com", ""]:
781 res = res.path.strip("/").split("/")
782 repo = RepoGitHub(res[0], res[1], branch)
783 else:
784 repo = Repo(uri.strip(), branch)
785 return repo
786
787
788def get_cache_path(cache_file_name: str) -> Path | None:
789 xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
790 if xdg_cache is None:
791 home = os.environ.get("HOME", None)
792 if home is None:
793 return None
794 xdg_cache = str(Path(home, ".cache"))
795
796 return Path(xdg_cache, cache_file_name)
797
798
799class Cache:
800 def __init__(self, initial_plugins: list[Plugin], cache_file_name: str) -> None:
801 self.cache_file = get_cache_path(cache_file_name)
802
803 downloads = {}
804 for plugin in initial_plugins:
805 downloads[plugin.commit] = plugin
806 downloads.update(self.load())
807 self.downloads = downloads
808
809 def load(self) -> dict[str, Plugin]:
810 if self.cache_file is None or not self.cache_file.exists():
811 return {}
812
813 downloads: dict[str, Plugin] = {}
814 with open(self.cache_file) as f:
815 data = json.load(f)
816 for attr in data.values():
817 p = Plugin(
818 attr["name"], attr["commit"], attr["has_submodules"], attr["sha256"]
819 )
820 downloads[attr["commit"]] = p
821 return downloads
822
823 def store(self) -> None:
824 if self.cache_file is None:
825 return
826
827 os.makedirs(self.cache_file.parent, exist_ok=True)
828 with open(self.cache_file, "w+") as f:
829 data = {}
830 for name, attr in self.downloads.items():
831 data[name] = attr.as_json()
832 json.dump(data, f, indent=4, sort_keys=True)
833
834 def __getitem__(self, key: str) -> Plugin | None:
835 return self.downloads.get(key, None)
836
837 def __setitem__(self, key: str, value: Plugin) -> None:
838 self.downloads[key] = value
839
840
841def prefetch(
842 pluginDesc: PluginDesc, cache: Cache
843) -> tuple[PluginDesc, Exception | Plugin, Repo | None]:
844 try:
845 plugin, redirect = prefetch_plugin(pluginDesc, cache)
846 cache[plugin.commit] = plugin
847 return (pluginDesc, plugin, redirect)
848 except Exception as e:
849 return (pluginDesc, e, None)
850
851
852def rewrite_input(
853 config: FetchConfig,
854 input_file: Path,
855 deprecated: Path,
856 # old pluginDesc and the new
857 redirects: Redirects = {},
858 append: list[PluginDesc] = [],
859):
860 log.info("Rewriting input file %s", input_file)
861 plugins = load_plugins_from_csv(config, input_file)
862
863 plugins.extend(append)
864
865 if redirects:
866 log.debug("Dealing with deprecated plugins listed in %s", deprecated)
867
868 cur_date_iso = datetime.now().strftime("%Y-%m-%d")
869 with open(deprecated, "r") as f:
870 deprecations = json.load(f)
871 # TODO parallelize this step
872 for pdesc, new_repo in redirects.items():
873 log.info("Resolving deprecated plugin %s -> %s", pdesc.name, new_repo.name)
874 new_pdesc = PluginDesc(new_repo, pdesc.branch, pdesc.alias)
875
876 old_plugin, _ = prefetch_plugin(pdesc)
877 new_plugin, _ = prefetch_plugin(new_pdesc)
878
879 if old_plugin.normalized_name != new_plugin.normalized_name:
880 deprecations[old_plugin.normalized_name] = {
881 "new": new_plugin.normalized_name,
882 "date": cur_date_iso,
883 }
884
885 # remove plugin from index file, so we won't add it to deprecations again
886 for i, plugin in enumerate(plugins):
887 if plugin.name == pdesc.name:
888 plugins.pop(i)
889 break
890 plugins.append(new_pdesc)
891
892 with open(deprecated, "w") as f:
893 json.dump(deprecations, f, indent=4, sort_keys=True)
894 f.write("\n")
895
896 with open(input_file, "w") as f:
897 log.debug("Writing into %s", input_file)
898 # fields = dataclasses.fields(PluginDesc)
899 fieldnames = ["repo", "branch", "alias"]
900 writer = csv.DictWriter(f, fieldnames, dialect="unix", quoting=csv.QUOTE_NONE)
901 writer.writeheader()
902 for plugin in sorted(plugins, key=lambda x: x.name):
903 writer.writerow(asdict(plugin))
904
905
906def commit(repo: git.Repo, message: str, files: list[Path]) -> None:
907 repo.index.add([str(f.resolve()) for f in files])
908
909 if repo.index.diff("HEAD"):
910 print(f'committing to nixpkgs "{message}"')
911 repo.index.commit(message)
912 else:
913 print("no changes in working tree to commit")
914
915
916def update_plugins(editor: Editor, args):
917 """The main entry function of this module.
918 All input arguments are grouped in the `Editor`."""
919
920 log.info("Start updating plugins")
921 if args.proc > 1 and args.github_token == None:
922 log.warning(
923 "You have enabled parallel updates but haven't set a github token.\n"
924 "You may be hit with `HTTP Error 429: too many requests` as a consequence."
925 "Either set --proc=1 or --github-token=YOUR_TOKEN. "
926 )
927
928 fetch_config = FetchConfig(args.proc, args.github_token)
929 update = editor.get_update(
930 input_file=args.input_file,
931 output_file=args.outfile,
932 config=fetch_config,
933 to_update=getattr( # if script was called without arguments
934 args, "update_only", None
935 ),
936 )
937
938 start_time = time.time()
939 redirects = update()
940 duration = time.time() - start_time
941 print(f"The plugin update took {duration:.2f}s.")
942 editor.rewrite_input(fetch_config, args.input_file, editor.deprecated, redirects)
943
944 autocommit = not args.no_commit
945
946 if autocommit:
947 try:
948 repo = git.Repo(os.getcwd())
949 updated = datetime.now(tz=UTC).strftime("%Y-%m-%d")
950 print(args.outfile)
951 commit(repo, f"{editor.attr_path}: update on {updated}", [args.outfile])
952 except git.InvalidGitRepositoryError as e:
953 print(f"Not in a git repository: {e}", file=sys.stderr)
954 sys.exit(1)
955
956 if redirects:
957 update()
958 if autocommit:
959 commit(
960 editor.nixpkgs_repo,
961 f"{editor.attr_path}: resolve github repository redirects",
962 [args.outfile, args.input_file, editor.deprecated],
963 )