1from urllib.parse import quote
2import json
3import subprocess as sub
4import os
5import sys
6from typing import Iterator, Any, Literal, TypedDict, Optional
7from tempfile import NamedTemporaryFile
8
9debug: bool = True if os.environ.get("DEBUG", False) else False
10Bin = str
11args: dict[str, Any] = json.loads(os.environ["ARGS"])
12bins: dict[str, Bin] = args["binaries"]
13
14mode: str = sys.argv[1]
15jsonArg: dict = json.loads(sys.argv[2])
16
17Args = Iterator[str]
18
19
20def log(msg: str) -> None:
21 print(msg, file=sys.stderr)
22
23
24def atomically_write(file_path: str, content: bytes) -> None:
25 """atomically write the content into `file_path`"""
26 with NamedTemporaryFile(
27 # write to the parent dir, so that it’s guaranteed to be on the same filesystem
28 dir=os.path.dirname(file_path),
29 delete=False
30 ) as tmp:
31 try:
32 tmp.write(content)
33 os.rename(
34 src=tmp.name,
35 dst=file_path
36 )
37 except Exception:
38 os.unlink(tmp.name)
39
40
41def curl_github_args(token: str | None, url: str) -> Args:
42 """Query the github API via curl"""
43 yield bins["curl"]
44 if not debug:
45 yield "--silent"
46 # follow redirects
47 yield "--location"
48 if token:
49 yield "-H"
50 yield f"Authorization: token {token}"
51 yield url
52
53
54def curl_result(output: bytes) -> Any | Literal["not found"]:
55 """Parse the curl result of the github API"""
56 res: Any = json.loads(output)
57 match res:
58 case dict(res):
59 message: str = res.get("message", "")
60 if "rate limit" in message:
61 sys.exit("Rate limited by the Github API")
62 if "Not Found" in message:
63 return "not found"
64 # if the result is another type, we can pass it on
65 return res
66
67
68def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
69 """Prefetch a git repository"""
70 yield bins["nix-prefetch-git"]
71 if not debug:
72 yield "--quiet"
73 yield "--no-deepClone"
74 yield "--url"
75 yield url
76 yield "--rev"
77 yield version_rev
78
79
80def run_cmd(args: Args) -> bytes:
81 all = list(args)
82 if debug:
83 log(str(all))
84 return sub.check_output(all)
85
86
87Dir = str
88
89
90def fetchRepo() -> None:
91 """fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
92 match jsonArg:
93 case {
94 "orga": orga,
95 "repo": repo,
96 "outputDir": outputDir,
97 "nixRepoAttrName": nixRepoAttrName,
98 }:
99 if repo in args["pinnedGrammars"]:
100 log(f"Grammar {repo} is pinned, skipping upgrade...")
101 return
102
103 token: str | None = os.environ.get("GITHUB_TOKEN", None)
104 out = run_cmd(
105 curl_github_args(
106 token,
107 url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
108 )
109 )
110 release: str
111 match curl_result(out):
112 case "not found":
113 if "branch" in jsonArg:
114 branch = jsonArg.get("branch")
115 release = f"refs/heads/{branch}"
116 else:
117 # github sometimes returns an empty list even tough there are releases
118 log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
119 release = "HEAD"
120 case {"tag_name": tag_name}:
121 release = tag_name
122 case _:
123 sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")
124
125 log(f"Fetching latest release ({release}) of {orga}/{repo} …")
126 res = run_cmd(
127 nix_prefetch_git_args(
128 url=f"https://github.com/{quote(orga)}/{quote(repo)}",
129 version_rev=release
130 )
131 )
132 atomically_write(
133 file_path=os.path.join(
134 outputDir,
135 f"{nixRepoAttrName}.json"
136 ),
137 content=res
138 )
139 case _:
140 sys.exit("input json must have `orga` and `repo` keys")
141
142
143def fetchOrgaLatestRepos(orga: str) -> set[str]:
144 """fetch the latest (100) repos from the given github organization"""
145 token: str | None = os.environ.get("GITHUB_TOKEN", None)
146 out = run_cmd(
147 curl_github_args(
148 token,
149 url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
150 )
151 )
152 match curl_result(out):
153 case "not found":
154 sys.exit(f"github organization {orga} not found")
155 case list(repos):
156 res: list[str] = []
157 for repo in repos:
158 name = repo.get("name")
159 if name:
160 res.append(name)
161 return set(res)
162 case _:
163 sys.exit("github result was not a list of repos, but {other}")
164
165
166def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
167 """Make sure we know about all tree sitter repos on the tree sitter orga."""
168 known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
169 ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])
170
171 unknown = latest_github_repos - (known | ignored)
172
173 if unknown:
174 sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")
175
176
177Grammar = TypedDict(
178 "Grammar",
179 {
180 "nixRepoAttrName": str,
181 "orga": str,
182 "repo": str,
183 "branch": Optional[str]
184 }
185)
186
187
188def printAllGrammarsNixFile() -> None:
189 """Print a .nix file that imports all grammars."""
190 allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
191 outputDir: Dir = jsonArg["outputDir"]
192
193 def file() -> Iterator[str]:
194 yield "{ lib }:"
195 yield "{"
196 for grammar in allGrammars:
197 n = grammar["nixRepoAttrName"]
198 yield f" {n} = lib.importJSON ./{n}.json;"
199 yield "}"
200 yield ""
201
202 atomically_write(
203 file_path=os.path.join(
204 outputDir,
205 "default.nix"
206 ),
207 content="\n".join(file()).encode()
208 )
209
210
211def fetchAndCheckTreeSitterRepos() -> None:
212 log("fetching list of grammars")
213 latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
214 log("checking the tree-sitter repo list against the grammars we know")
215 checkTreeSitterRepos(latest_repos)
216
217
218match mode:
219 case "fetch-repo":
220 fetchRepo()
221 case "fetch-and-check-tree-sitter-repos":
222 fetchAndCheckTreeSitterRepos()
223 case "print-all-grammars-nix-file":
224 printAllGrammarsNixFile()
225 case _:
226 sys.exit(f"mode {mode} unknown")