Assorted shell and Python scripts
1#!/usr/bin/env -S uv run --script
2# /// script
3# dependencies = [
4# "docopt",
5# "rich",
6# ]
7# ///
8
9"""OCI Registry Helper
10
11Usage:
12 ocirh <subcommand> [<args>...]
13
14Subcommands:
15 repos Lists repositories in the registry. Repos correspond to images
16 pushed to the registry.
17 tags Lists tags of the given repository.
18 manifests Lists manifests of the given repository for the given tag.
19 rmi Removes a tag from an image. If given tag is the only tag,
20 removes the image.
21 gc Runs garbage collection on the registry. Requires SSH public key
22 access to registry server.
23 rmr Removes given repository from the registry. Requires SSH public
24 key access to registry server.
25
26Examples:
27 Suppose we have an image called 'fedora-toolbox' tagged with 'latest'.
28
29 ocirh repos
30 ocirh tags fedora-toolbox
31 ocirh manifests fedora-toolbox latest
32 ocirh rmi fedora-toolbox latest
33 ocirh gc
34 ocirh rmr fedora-toolbox
35"""
36import http.client
37import json
38import logging
39import math
40import subprocess
41
42from docopt import docopt
43from rich import print
44from rich.console import Group
45from rich.logging import RichHandler
46from rich.panel import Panel
47from rich.table import Table
48from rich.text import Text
49from rich.traceback import install
50from rich.tree import Tree
51
52install(show_locals=True)
53
54# Rich logging handler
55FORMAT = "%(message)s"
56logging.basicConfig(
57 level="NOTSET",
58 format=FORMAT,
59 datefmt="[%X]",
60 handlers=[RichHandler(rich_tracebacks=True)],
61)
62log = logging.getLogger("rich")
63
64
65# Taken from https://stackoverflow.com/a/14822210
66#
67# How this function works:
68# If size_bytes == 0, returns 0 B.
69# size_name is a tuple containing binary prefixes for bytes.
70#
71# math.log takes the logarithm of size_bytes to base 1024.
72# math.floor rounds down the result of math.log to the nearest integer.
73# int ensures the result of math.floor is of type int, and stores it in i.
74# The value of i is used to determine which binary prefix to use from
75# size_name.
76#
77# math.pow returns the value of 1024 raised to the power of i, stores it in p.
78#
79# round takes the value of size_bytes, divides it by p, and stores the result
80# in s at precision of 2 decimal places.
81#
82# A formatted string with size s and binary prefix size_name[i] is returned.
83def convert_size(size_bytes: int) -> str:
84 """
85 Converts a decimal integer of bytes to its respective binary-prefixed size.
86
87 Parameters:
88 size_bytes (int): A decimal integer.
89
90 Returns:
91 (str): Binary-prefixed size of size_bytes formatted as a string.
92 """
93 if size_bytes == 0:
94 return "0 B"
95 size_name = ("B", "KiB", "MiB", "GiB")
96 i = int(math.floor(math.log(size_bytes, 1024)))
97 p = math.pow(1024, i)
98 s = round(size_bytes / p, 2)
99 return "%s %s" % (s, size_name[i])
100
101
102REGISTRY_URL = "registry.hyperreal.coffee"
103
104
105def get_auth() -> str:
106 """
107 Get the base64 encoded password for registry autentication.
108
109 Returns:
110 auth (str): A string containing the base64 encoded password.
111 """
112 try:
113 with open("/run/user/1000/containers/auth.json", "r") as authfile:
114 json_data = json.loads(authfile.read())
115 except Exception as ex:
116 log.exception(ex)
117
118 auth = json_data["auths"][REGISTRY_URL]["auth"]
119 return auth
120
121
122def get_headers() -> dict:
123 """
124 Returns headers for HTTP request authentication to the registry server.
125
126 Returns:
127 headers (dict): A dict of HTTP headers
128 """
129 return {
130 "Accept": "application/vnd.oci.image.manifest.v1+json",
131 "Authorization": "Basic " + get_auth(),
132 }
133
134
135def get_json_response(request: str, url: str) -> dict:
136 """
137 Connects to registry and returns response data as JSON.
138
139 Parameters:
140 request (str): A string like "GET" or "DELETE"
141 url (str) : A string containing the URL of the requested data
142
143 Returns:
144 json_data (dict): JSON data as a dict object
145 """
146 conn = http.client.HTTPSConnection(REGISTRY_URL)
147 headers = get_headers()
148 try:
149 conn.request(request, url, "", headers)
150 res = conn.getresponse()
151 data = res.read()
152 json_data = json.loads(data.decode("utf-8"))
153 except Exception as ex:
154 log.exception(ex)
155
156 return json_data
157
158
159def get_repositories():
160 """
161 Prints a Rich Tree that lists the repositories of the registry.
162 """
163
164 json_data = get_json_response("GET", "/v2/_catalog")
165 repo_tree = Tree("[green]Repositories")
166 for repo in json_data["repositories"]:
167 repo_tree.add("[blue]%s" % repo)
168
169 print(repo_tree)
170
171
172def get_tags(repo: str):
173 """
174 Prints a Rich Tree that lists the tags for the given repository.
175
176 Parameters:
177 repo (str): A string containing the name of the repo
178 """
179 json_data = get_json_response("GET", "/v2/" + repo + "/tags/list")
180 tags_tree = Tree("[green]%s tags" % repo)
181 for tag in json_data["tags"]:
182 tags_tree.add("[cyan]:%s" % tag)
183
184 print(tags_tree)
185
186
187def get_manifests(repo: str, tag: str):
188 """
189 Prints a Rich grid table that displays the manifests and metadata of the
190 image repository.
191
192 Parameters:
193 repo (str): A string containing the name of the repo
194 tag (str) : A string containing the tag of the desired image
195 """
196 json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag)
197
198 grid_meta = Table.grid(expand=True)
199 grid_meta.add_column()
200 grid_meta.add_column()
201 meta_schema_version_key = Text("Schema version")
202 meta_schema_version_key.stylize("bold green", 0)
203 meta_schema_version_value = Text(str(json_data["schemaVersion"]))
204 meta_media_type_key = Text("Media type")
205 meta_media_type_key.stylize("bold green", 0)
206 meta_media_type_value = Text(json_data["mediaType"])
207 grid_meta.add_row(meta_schema_version_key, meta_schema_version_value)
208 grid_meta.add_row(meta_media_type_key, meta_media_type_value)
209
210 grid_config = Table.grid(expand=True)
211 grid_config.add_column()
212 grid_config.add_column()
213 config_media_type_key = Text("Media type")
214 config_media_type_key.stylize("bold green", 0)
215 config_media_type_value = Text(json_data["config"]["mediaType"])
216 config_digest_key = Text("Digest")
217 config_digest_key.stylize("bold green", 0)
218 config_digest_value = Text(json_data["config"]["digest"])
219 config_size_key = Text("Size")
220 config_size_key.stylize("bold green", 0)
221 config_size_value = Text(convert_size(json_data["config"]["size"]))
222 grid_config.add_row(config_media_type_key, config_media_type_value)
223 grid_config.add_row(config_digest_key, config_digest_value)
224 grid_config.add_row(config_size_key, config_size_value)
225
226 grid_annotations = Table.grid(expand=True)
227 grid_annotations.add_column()
228 grid_annotations.add_column()
229 for item in json_data["annotations"].items():
230 annotations_item_key = Text(item[0])
231 annotations_item_key.stylize("bold green", 0)
232 annotations_item_value = Text(item[1])
233 grid_annotations.add_row(annotations_item_key, annotations_item_value)
234
235 total_size = sum(layer.get("size") for layer in json_data["layers"])
236 table_layers = Table(box=None, show_footer=True)
237 table_layers.add_column(
238 "Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:"
239 )
240 table_layers.add_column(
241 "Size",
242 justify="left",
243 style="cyan",
244 no_wrap=True,
245 footer=convert_size(total_size),
246 )
247 for layer in json_data["layers"]:
248 table_layers.add_row(layer.get("digest"), convert_size(layer.get("size")))
249
250 panel_group = Group(
251 Panel(grid_meta, title="[bold blue]Metadata"),
252 Panel(grid_config, title="[bold blue]Config"),
253 Panel(grid_annotations, title="Annotations"),
254 Panel(
255 table_layers,
256 title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"),
257 ),
258 )
259 print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag)))
260
261
262def delete_image(repo: str, tag: str):
263 """
264 Removes the given tag from the image. If the given tag is the only tag,
265 removes the image.
266
267 Parameters:
268 repo (str): A string containing the name of the repo
269 tag (str) : A string containing the tag to be removed
270 """
271 try:
272 conn = http.client.HTTPSConnection(REGISTRY_URL)
273 headers = get_headers()
274 conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers)
275 res = conn.getresponse()
276 docker_content_digest = res.getheader("Docker-Content-Digest")
277 except Exception as ex:
278 log.exception(ex)
279
280 try:
281 conn.request(
282 "DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers
283 )
284 except Exception as ex:
285 log.exception(ex)
286
287 print("Untagged %s:%s successfully" % (repo, tag))
288
289
290def garbage_collection():
291 """
292 Runs garbage collection command on the remote registry server. Requires
293 SSH public key access.
294 """
295 command = "/usr/local/bin/registry-gc"
296
297 try:
298 ssh = subprocess.Popen(
299 ["ssh", "%s" % REGISTRY_URL, command],
300 shell=False,
301 stdout=subprocess.PIPE,
302 stderr=subprocess.PIPE,
303 text=True,
304 )
305 result = ssh.stdout.readlines()
306 if result == []:
307 log.error(ssh.stderr.readlines())
308 else:
309 print(result)
310 except Exception as ex:
311 log.exception(ex)
312
313
314def remove_repo(repo: str):
315 """
316 Runs command on remote registry server to remove the given repo.
317
318 Parameters:
319 repo (str): A string containing the name of the repo.
320 """
321 command = "/usr/local/bin/registry-rm-repo " + repo
322
323 try:
324 ssh = subprocess.Popen(
325 ["ssh", "%s" % REGISTRY_URL, command],
326 shell=False,
327 stdout=subprocess.PIPE,
328 stderr=subprocess.PIPE,
329 text=True,
330 )
331 result = ssh.stdout.readlines()
332 if result == []:
333 log.error(ssh.stderr.readlines())
334 else:
335 print(result)
336 except Exception as ex:
337 log.exception(ex)
338
339
340if __name__ == "__main__":
341 args = docopt(__doc__, options_first=True)
342 match args["<subcommand>"]:
343 case "repos":
344 get_repositories()
345 case "tags":
346 get_tags(args["<args>"][0])
347 case "manifests":
348 get_manifests(args["<args>"][0], args["<args>"][1])
349 case "rmi":
350 delete_image(args["<args>"][0], args["<args>"][1])
351 case "gc":
352 garbage_collection()
353 case "rmr":
354 remove_repo(args["<args>"])
355 case _:
356 if args["<subcommand>"] in ["help", None]:
357 exit(subprocess.call(["python3", "ocirh", "--help"]))
358 else:
359 exit(
360 "%r is not a ocirh subcommand. See 'ocirh --help."
361 % args["<subcommand>"]
362 )