at master 6.8 kB view raw
1from urllib.parse import quote 2import json 3import subprocess as sub 4import os 5import sys 6from typing import Iterator, Any, Literal, TypedDict, Optional 7from tempfile import NamedTemporaryFile 8 9debug: bool = True if os.environ.get("DEBUG", False) else False 10Bin = str 11args: dict[str, Any] = json.loads(os.environ["ARGS"]) 12bins: dict[str, Bin] = args["binaries"] 13 14mode: str = sys.argv[1] 15jsonArg: dict = json.loads(sys.argv[2]) 16 17Args = Iterator[str] 18 19 20def log(msg: str) -> None: 21 print(msg, file=sys.stderr) 22 23 24def atomically_write(file_path: str, content: bytes) -> None: 25 """atomically write the content into `file_path`""" 26 with NamedTemporaryFile( 27 # write to the parent dir, so that it’s guaranteed to be on the same filesystem 28 dir=os.path.dirname(file_path), 29 delete=False 30 ) as tmp: 31 try: 32 tmp.write(content) 33 os.rename( 34 src=tmp.name, 35 dst=file_path 36 ) 37 except Exception: 38 os.unlink(tmp.name) 39 40 41def curl_github_args(token: str | None, url: str) -> Args: 42 """Query the github API via curl""" 43 yield bins["curl"] 44 if not debug: 45 yield "--silent" 46 # follow redirects 47 yield "--location" 48 if token: 49 yield "-H" 50 yield f"Authorization: token {token}" 51 yield url 52 53 54def curl_result(output: bytes) -> Any | Literal["not found"]: 55 """Parse the curl result of the github API""" 56 res: Any = json.loads(output) 57 match res: 58 case dict(res): 59 message: str = res.get("message", "") 60 if "rate limit" in message: 61 sys.exit("Rate limited by the Github API") 62 if "Not Found" in message: 63 return "not found" 64 # if the result is another type, we can pass it on 65 return res 66 67 68def nix_prefetch_git_args(url: str, version_rev: str) -> Args: 69 """Prefetch a git repository""" 70 yield bins["nix-prefetch-git"] 71 if not debug: 72 yield "--quiet" 73 yield "--no-deepClone" 74 yield "--url" 75 yield url 76 yield "--rev" 77 yield version_rev 78 79 80def run_cmd(args: Args) -> bytes: 81 all = list(args) 82 if debug: 83 log(str(all)) 84 return sub.check_output(all) 85 86 87Dir = str 88 89 90def fetchRepo() -> None: 91 """fetch the given repo and write its nix-prefetch output to the corresponding grammar json file""" 92 match jsonArg: 93 case { 94 "orga": orga, 95 "repo": repo, 96 "outputDir": outputDir, 97 "nixRepoAttrName": nixRepoAttrName, 98 }: 99 if repo in args["pinnedGrammars"]: 100 log(f"Grammar {repo} is pinned, skipping upgrade...") 101 return 102 103 token: str | None = os.environ.get("GITHUB_TOKEN", None) 104 out = run_cmd( 105 curl_github_args( 106 token, 107 url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest" 108 ) 109 ) 110 release: str 111 match curl_result(out): 112 case "not found": 113 if "branch" in jsonArg: 114 branch = jsonArg.get("branch") 115 release = f"refs/heads/{branch}" 116 else: 117 # github sometimes returns an empty list even tough there are releases 118 log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD") 119 release = "HEAD" 120 case {"tag_name": tag_name}: 121 release = tag_name 122 case _: 123 sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field") 124 125 log(f"Fetching latest release ({release}) of {orga}/{repo}") 126 res = run_cmd( 127 nix_prefetch_git_args( 128 url=f"https://github.com/{quote(orga)}/{quote(repo)}", 129 version_rev=release 130 ) 131 ) 132 atomically_write( 133 file_path=os.path.join( 134 outputDir, 135 f"{nixRepoAttrName}.json" 136 ), 137 content=res 138 ) 139 case _: 140 sys.exit("input json must have `orga` and `repo` keys") 141 142 143def fetchOrgaLatestRepos(orga: str) -> set[str]: 144 """fetch the latest (100) repos from the given github organization""" 145 token: str | None = os.environ.get("GITHUB_TOKEN", None) 146 out = run_cmd( 147 curl_github_args( 148 token, 149 url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100" 150 ) 151 ) 152 match curl_result(out): 153 case "not found": 154 sys.exit(f"github organization {orga} not found") 155 case list(repos): 156 res: list[str] = [] 157 for repo in repos: 158 name = repo.get("name") 159 if name: 160 res.append(name) 161 return set(res) 162 case _: 163 sys.exit("github result was not a list of repos, but {other}") 164 165 166def checkTreeSitterRepos(latest_github_repos: set[str]) -> None: 167 """Make sure we know about all tree sitter repos on the tree sitter orga.""" 168 known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"]) 169 ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"]) 170 171 unknown = latest_github_repos - (known | ignored) 172 173 if unknown: 174 sys.exit(f"These repositories are neither known nor ignored:\n{unknown}") 175 176 177Grammar = TypedDict( 178 "Grammar", 179 { 180 "nixRepoAttrName": str, 181 "orga": str, 182 "repo": str, 183 "branch": Optional[str] 184 } 185) 186 187 188def printAllGrammarsNixFile() -> None: 189 """Print a .nix file that imports all grammars.""" 190 allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"] 191 outputDir: Dir = jsonArg["outputDir"] 192 193 def file() -> Iterator[str]: 194 yield "{ lib }:" 195 yield "{" 196 for grammar in allGrammars: 197 n = grammar["nixRepoAttrName"] 198 yield f" {n} = lib.importJSON ./{n}.json;" 199 yield "}" 200 yield "" 201 202 atomically_write( 203 file_path=os.path.join( 204 outputDir, 205 "default.nix" 206 ), 207 content="\n".join(file()).encode() 208 ) 209 210 211def fetchAndCheckTreeSitterRepos() -> None: 212 log("fetching list of grammars") 213 latest_repos = fetchOrgaLatestRepos(orga="tree-sitter") 214 log("checking the tree-sitter repo list against the grammars we know") 215 checkTreeSitterRepos(latest_repos) 216 217 218match mode: 219 case "fetch-repo": 220 fetchRepo() 221 case "fetch-and-check-tree-sitter-repos": 222 fetchAndCheckTreeSitterRepos() 223 case "print-all-grammars-nix-file": 224 printAllGrammarsNixFile() 225 case _: 226 sys.exit(f"mode {mode} unknown")