pluginupdate.py: use newer syntax for types

Changed files
+49 -58
maintainers
scripts
pluginupdate-py
pkgs
applications
editors
kakoune
plugins
by-name
lu
luarocks-packages-updater
+37 -39
maintainers/scripts/pluginupdate-py/pluginupdate.py
···
from multiprocessing.dummy import Pool
from pathlib import Path
from tempfile import NamedTemporaryFile
-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urljoin, urlparse
import git
···
# a dictionary of plugins and their new repositories
-
Redirects = Dict["PluginDesc", "Repo"]
class Repo:
···
"""Url to the repo"""
self._branch = branch
# Redirect is the new Repo to use
-
self.redirect: Optional["Repo"] = None
self.token = "dummy_token"
@property
···
return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
-
def latest_commit(self) -> Tuple[str, datetime]:
log.debug("Latest commit")
loaded = self._prefetch(None)
updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z")
return loaded["rev"], updated
-
def _prefetch(self, ref: Optional[str]):
cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
if ref is not None:
cmd.append(ref)
···
loaded = json.loads(data)
return loaded
-
def prefetch(self, ref: Optional[str]) -> str:
log.info("Prefetching %s", self.uri)
loaded = self._prefetch(ref)
return loaded["sha256"]
···
return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
-
def latest_commit(self) -> Tuple[str, datetime]:
commit_url = self.url(f"commits/{self.branch}.atom")
log.debug("Sending request to %s", commit_url)
commit_req = make_request(commit_url, self.token)
···
class PluginDesc:
repo: Repo
branch: str
-
alias: Optional[str]
@property
def name(self):
return self.alias or self.repo.name
@staticmethod
-
def load_from_csv(config: FetchConfig, row: Dict[str, str]) -> "PluginDesc":
log.debug("Loading row %s", row)
branch = row["branch"]
repo = make_repo(row["repo"], branch.strip())
···
commit: str
has_submodules: bool
sha256: str
-
date: Optional[datetime] = None
@property
def normalized_name(self) -> str:
···
assert self.date is not None
return self.date.strftime("%Y-%m-%d")
-
def as_json(self) -> Dict[str, str]:
copy = self.__dict__.copy()
del copy["date"]
return copy
···
def load_plugins_from_csv(
config: FetchConfig,
input_file: Path,
-
) -> List[PluginDesc]:
log.debug("Load plugins from csv %s", input_file)
plugins = []
with open(input_file, newline="") as csvfile:
···
name: str,
root: Path,
get_plugins: str,
-
default_in: Optional[Path] = None,
-
default_out: Optional[Path] = None,
-
deprecated: Optional[Path] = None,
-
cache_file: Optional[str] = None,
):
log.debug("get_plugins:", get_plugins)
self.name = name
···
def get_current_plugins(
self, config: FetchConfig, nixpkgs: str
-
) -> List[Tuple[PluginDesc, Plugin]]:
"""To fill the cache"""
data = run_nix_expr(self.get_plugins, nixpkgs)
plugins = []
···
plugins.append((pdesc, p))
return plugins
-
def load_plugin_spec(self, config: FetchConfig, plugin_file) -> List[PluginDesc]:
"""CSV spec"""
return load_plugins_from_csv(config, plugin_file)
···
raise NotImplementedError()
def filter_plugins_to_update(
-
self, plugin: PluginDesc, to_update: List[str]
) -> bool:
"""Function for filtering out plugins, that user doesn't want to update.
···
input_file: str,
output_file: str,
config: FetchConfig,
-
to_update: Optional[List[str]],
):
if to_update is None:
to_update = []
···
def merge_results(
self,
-
current: list[Tuple[PluginDesc, Plugin]],
-
fetched: List[Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]],
-
) -> List[Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]]:
# transforming this to dict, so lookup is O(1) instead of O(n) (n is len(current))
-
result: Dict[
-
str, Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]
-
] = {
# also adding redirect (third item in the result tuple)
pl.normalized_name: (pdesc, pl, None)
for pdesc, pl in current
···
def prefetch_plugin(
p: PluginDesc,
-
cache: "Optional[Cache]" = None,
-
) -> Tuple[Plugin, Optional[Repo]]:
commit = None
log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}")
commit, date = p.repo.latest_commit()
···
def check_results(
-
results: List[Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]],
-
) -> Tuple[List[Tuple[PluginDesc, Plugin]], Redirects]:
""" """
-
failures: List[Tuple[PluginDesc, Exception]] = []
plugins = []
redirects: Redirects = {}
for pdesc, result, redirect in results:
···
return repo
-
def get_cache_path(cache_file_name: str) -> Optional[Path]:
xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
if xdg_cache is None:
home = os.environ.get("HOME", None)
···
class Cache:
-
def __init__(self, initial_plugins: List[Plugin], cache_file_name: str) -> None:
self.cache_file = get_cache_path(cache_file_name)
downloads = {}
···
downloads.update(self.load())
self.downloads = downloads
-
def load(self) -> Dict[str, Plugin]:
if self.cache_file is None or not self.cache_file.exists():
return {}
-
downloads: Dict[str, Plugin] = {}
with open(self.cache_file) as f:
data = json.load(f)
for attr in data.values():
···
data[name] = attr.as_json()
json.dump(data, f, indent=4, sort_keys=True)
-
def __getitem__(self, key: str) -> Optional[Plugin]:
return self.downloads.get(key, None)
def __setitem__(self, key: str, value: Plugin) -> None:
···
def prefetch(
pluginDesc: PluginDesc, cache: Cache
-
) -> Tuple[PluginDesc, Union[Exception, Plugin], Optional[Repo]]:
try:
plugin, redirect = prefetch_plugin(pluginDesc, cache)
cache[plugin.commit] = plugin
···
deprecated: Path,
# old pluginDesc and the new
redirects: Redirects = {},
-
append: List[PluginDesc] = [],
):
log.info("Rewriting input file %s", input_file)
plugins = load_plugins_from_csv(config, input_file)
···
writer.writerow(asdict(plugin))
-
def commit(repo: git.Repo, message: str, files: List[Path]) -> None:
repo.index.add([str(f.resolve()) for f in files])
if repo.index.diff("HEAD"):
···
from multiprocessing.dummy import Pool
from pathlib import Path
from tempfile import NamedTemporaryFile
+
from typing import Any, Callable
from urllib.parse import urljoin, urlparse
import git
···
# a dictionary of plugins and their new repositories
+
Redirects = dict["PluginDesc", "Repo"]
class Repo:
···
"""Url to the repo"""
self._branch = branch
# Redirect is the new Repo to use
+
self.redirect: "Repo | None" = None
self.token = "dummy_token"
@property
···
return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
+
def latest_commit(self) -> tuple[str, datetime]:
log.debug("Latest commit")
loaded = self._prefetch(None)
updated = datetime.strptime(loaded["date"], "%Y-%m-%dT%H:%M:%S%z")
return loaded["rev"], updated
+
def _prefetch(self, ref: str | None):
cmd = ["nix-prefetch-git", "--quiet", "--fetch-submodules", self.uri]
if ref is not None:
cmd.append(ref)
···
loaded = json.loads(data)
return loaded
+
def prefetch(self, ref: str | None) -> str:
log.info("Prefetching %s", self.uri)
loaded = self._prefetch(ref)
return loaded["sha256"]
···
return True
@retry(urllib.error.URLError, tries=4, delay=3, backoff=2)
+
def latest_commit(self) -> tuple[str, datetime]:
commit_url = self.url(f"commits/{self.branch}.atom")
log.debug("Sending request to %s", commit_url)
commit_req = make_request(commit_url, self.token)
···
class PluginDesc:
repo: Repo
branch: str
+
alias: str | None
@property
def name(self):
return self.alias or self.repo.name
@staticmethod
+
def load_from_csv(config: FetchConfig, row: dict[str, str]) -> "PluginDesc":
log.debug("Loading row %s", row)
branch = row["branch"]
repo = make_repo(row["repo"], branch.strip())
···
commit: str
has_submodules: bool
sha256: str
+
date: datetime | None = None
@property
def normalized_name(self) -> str:
···
assert self.date is not None
return self.date.strftime("%Y-%m-%d")
+
def as_json(self) -> dict[str, str]:
copy = self.__dict__.copy()
del copy["date"]
return copy
···
def load_plugins_from_csv(
config: FetchConfig,
input_file: Path,
+
) -> list[PluginDesc]:
log.debug("Load plugins from csv %s", input_file)
plugins = []
with open(input_file, newline="") as csvfile:
···
name: str,
root: Path,
get_plugins: str,
+
default_in: Path | None = None,
+
default_out: Path | None = None,
+
deprecated: Path | None = None,
+
cache_file: str | None = None,
):
log.debug("get_plugins:", get_plugins)
self.name = name
···
def get_current_plugins(
self, config: FetchConfig, nixpkgs: str
+
) -> list[tuple[PluginDesc, Plugin]]:
"""To fill the cache"""
data = run_nix_expr(self.get_plugins, nixpkgs)
plugins = []
···
plugins.append((pdesc, p))
return plugins
+
def load_plugin_spec(self, config: FetchConfig, plugin_file) -> list[PluginDesc]:
"""CSV spec"""
return load_plugins_from_csv(config, plugin_file)
···
raise NotImplementedError()
def filter_plugins_to_update(
+
self, plugin: PluginDesc, to_update: list[str]
) -> bool:
"""Function for filtering out plugins, that user doesn't want to update.
···
input_file: str,
output_file: str,
config: FetchConfig,
+
to_update: list[str] | None,
):
if to_update is None:
to_update = []
···
def merge_results(
self,
+
current: list[tuple[PluginDesc, Plugin]],
+
fetched: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
+
) -> list[tuple[PluginDesc, Exception | Plugin, Repo | None]]:
# transforming this to dict, so lookup is O(1) instead of O(n) (n is len(current))
+
result: dict[str, tuple[PluginDesc, Exception | Plugin, Repo | None]] = {
# also adding redirect (third item in the result tuple)
pl.normalized_name: (pdesc, pl, None)
for pdesc, pl in current
···
def prefetch_plugin(
p: PluginDesc,
+
cache: "Cache | None" = None,
+
) -> tuple[Plugin, Repo | None]:
commit = None
log.info(f"Fetching last commit for plugin {p.name} from {p.repo.uri}@{p.branch}")
commit, date = p.repo.latest_commit()
···
def check_results(
+
results: list[tuple[PluginDesc, Exception | Plugin, Repo | None]],
+
) -> tuple[list[tuple[PluginDesc, Plugin]], Redirects]:
""" """
+
failures: list[tuple[PluginDesc, Exception]] = []
plugins = []
redirects: Redirects = {}
for pdesc, result, redirect in results:
···
return repo
+
def get_cache_path(cache_file_name: str) -> Path | None:
xdg_cache = os.environ.get("XDG_CACHE_HOME", None)
if xdg_cache is None:
home = os.environ.get("HOME", None)
···
class Cache:
+
def __init__(self, initial_plugins: list[Plugin], cache_file_name: str) -> None:
self.cache_file = get_cache_path(cache_file_name)
downloads = {}
···
downloads.update(self.load())
self.downloads = downloads
+
def load(self) -> dict[str, Plugin]:
if self.cache_file is None or not self.cache_file.exists():
return {}
+
downloads: dict[str, Plugin] = {}
with open(self.cache_file) as f:
data = json.load(f)
for attr in data.values():
···
data[name] = attr.as_json()
json.dump(data, f, indent=4, sort_keys=True)
+
def __getitem__(self, key: str) -> Plugin | None:
return self.downloads.get(key, None)
def __setitem__(self, key: str, value: Plugin) -> None:
···
def prefetch(
pluginDesc: PluginDesc, cache: Cache
+
) -> tuple[PluginDesc, Exception | Plugin, Repo | None]:
try:
plugin, redirect = prefetch_plugin(pluginDesc, cache)
cache[plugin.commit] = plugin
···
deprecated: Path,
# old pluginDesc and the new
redirects: Redirects = {},
+
append: list[PluginDesc] = [],
):
log.info("Rewriting input file %s", input_file)
plugins = load_plugins_from_csv(config, input_file)
···
writer.writerow(asdict(plugin))
+
def commit(repo: git.Repo, message: str, files: list[Path]) -> None:
repo.index.add([str(f.resolve()) for f in files])
if repo.index.diff("HEAD"):
+1 -3
pkgs/applications/editors/kakoune/plugins/update.py
···
import os
import sys
from pathlib import Path
-
from typing import List, Tuple
# Import plugin update library from maintainers/scripts/pluginupdate.py
ROOT = Path(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # type: ignore
···
HEADER = "# This file has been @generated by ./pkgs/applications/editors/kakoune/plugins/update.py. Do not edit!"
-
class KakouneEditor(pluginupdate.Editor):
def generate_nix(
self,
-
plugins: List[Tuple[pluginupdate.PluginDesc, pluginupdate.Plugin]],
outfile: str,
):
with open(outfile, "w+") as f:
···
import os
import sys
from pathlib import Path
# Import plugin update library from maintainers/scripts/pluginupdate.py
ROOT = Path(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # type: ignore
···
HEADER = "# This file has been @generated by ./pkgs/applications/editors/kakoune/plugins/update.py. Do not edit!"
class KakouneEditor(pluginupdate.Editor):
def generate_nix(
self,
+
plugins: list[tuple[pluginupdate.PluginDesc, pluginupdate.Plugin]],
outfile: str,
):
with open(outfile, "w+") as f:
+11 -16
pkgs/by-name/lu/luarocks-packages-updater/updater.py
···
from dataclasses import dataclass
from multiprocessing.dummy import Pool
from pathlib import Path
-
from typing import List, Optional, Tuple
import pluginupdate
from pluginupdate import FetchConfig, update_plugins
···
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
-
ROOT = Path(
-
os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
-
).parent.parent # type: ignore
PKG_LIST = "maintainers/scripts/luarocks-packages.csv"
TMP_FILE = "$(mktemp)"
···
"""Name of the plugin, as seen on luarocks.org"""
rockspec: str
"""Full URI towards the rockspec"""
-
ref: Optional[str]
"""git reference (branch name/tag)"""
-
version: Optional[str]
"""Set it to pin a package """
-
server: Optional[str]
"""luarocks.org registers packages under different manifests.
Its value can be 'http://luarocks.org/dev'
"""
-
luaversion: Optional[str]
"""lua version if a package is available only for a specific lua version"""
-
maintainers: Optional[str]
-
""" Optional string listing maintainers separated by spaces"""
@property
def normalized_name(self) -> str:
···
def get_current_plugins(self):
return []
-
def load_plugin_spec(self, input_file) -> List[LuaPlugin]:
luaPackages = []
csvfilename = input_file
log.info("Loading package descriptions from %s", csvfilename)
···
def update(self, args):
update_plugins(self, args)
-
def generate_nix(self, results: List[Tuple[LuaPlugin, str]], outfilename: str):
with tempfile.NamedTemporaryFile("w+") as f:
f.write(HEADER)
header2 = textwrap.dedent(
···
outfile: str,
config: FetchConfig,
# TODO: implement support for adding/updating individual plugins
-
to_update: Optional[List[str]],
):
if to_update is not None:
-
raise NotImplementedError(
-
"For now, lua updater doesn't support updating individual packages."
-
)
_prefetch = generate_pkg_nix
def update() -> dict:
···
from dataclasses import dataclass
from multiprocessing.dummy import Pool
from pathlib import Path
import pluginupdate
from pluginupdate import FetchConfig, update_plugins
···
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
+
ROOT = Path(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))).parent.parent # type: ignore
PKG_LIST = "maintainers/scripts/luarocks-packages.csv"
TMP_FILE = "$(mktemp)"
···
"""Name of the plugin, as seen on luarocks.org"""
rockspec: str
"""Full URI towards the rockspec"""
+
ref: str | None
"""git reference (branch name/tag)"""
+
version: str | None
"""Set it to pin a package """
+
server: str | None
"""luarocks.org registers packages under different manifests.
Its value can be 'http://luarocks.org/dev'
"""
+
luaversion: str | None
"""lua version if a package is available only for a specific lua version"""
+
maintainers: str | None
+
"""Optional string listing maintainers separated by spaces"""
@property
def normalized_name(self) -> str:
···
def get_current_plugins(self):
return []
+
def load_plugin_spec(self, input_file) -> list[LuaPlugin]:
luaPackages = []
csvfilename = input_file
log.info("Loading package descriptions from %s", csvfilename)
···
def update(self, args):
update_plugins(self, args)
+
def generate_nix(self, results: list[tuple[LuaPlugin, str]], outfilename: str):
with tempfile.NamedTemporaryFile("w+") as f:
f.write(HEADER)
header2 = textwrap.dedent(
···
outfile: str,
config: FetchConfig,
# TODO: implement support for adding/updating individual plugins
+
to_update: list[str] | None,
):
if to_update is not None:
+
raise NotImplementedError("For now, lua updater doesn't support updating individual packages.")
_prefetch = generate_pkg_nix
def update() -> dict: