Assorted shell and Python scripts
at main 11 kB view raw
1#!/usr/bin/env -S uv run --script 2# /// script 3# dependencies = [ 4# "docopt", 5# "rich", 6# ] 7# /// 8 9"""OCI Registry Helper 10 11Usage: 12 ocirh <subcommand> [<args>...] 13 14Subcommands: 15 repos Lists repositories in the registry. Repos correspond to images 16 pushed to the registry. 17 tags Lists tags of the given repository. 18 manifests Lists manifests of the given repository for the given tag. 19 rmi Removes a tag from an image. If given tag is the only tag, 20 removes the image. 21 gc Runs garbage collection on the registry. Requires SSH public key 22 access to registry server. 23 rmr Removes given repository from the registry. Requires SSH public 24 key access to registry server. 25 26Examples: 27 Suppose we have an image called 'fedora-toolbox' tagged with 'latest'. 28 29 ocirh repos 30 ocirh tags fedora-toolbox 31 ocirh manifests fedora-toolbox latest 32 ocirh rmi fedora-toolbox latest 33 ocirh gc 34 ocirh rmr fedora-toolbox 35""" 36import http.client 37import json 38import logging 39import math 40import subprocess 41 42from docopt import docopt 43from rich import print 44from rich.console import Group 45from rich.logging import RichHandler 46from rich.panel import Panel 47from rich.table import Table 48from rich.text import Text 49from rich.traceback import install 50from rich.tree import Tree 51 52install(show_locals=True) 53 54# Rich logging handler 55FORMAT = "%(message)s" 56logging.basicConfig( 57 level="NOTSET", 58 format=FORMAT, 59 datefmt="[%X]", 60 handlers=[RichHandler(rich_tracebacks=True)], 61) 62log = logging.getLogger("rich") 63 64 65# Taken from https://stackoverflow.com/a/14822210 66# 67# How this function works: 68# If size_bytes == 0, returns 0 B. 69# size_name is a tuple containing binary prefixes for bytes. 70# 71# math.log takes the logarithm of size_bytes to base 1024. 72# math.floor rounds down the result of math.log to the nearest integer. 73# int ensures the result of math.floor is of type int, and stores it in i. 74# The value of i is used to determine which binary prefix to use from 75# size_name. 76# 77# math.pow returns the value of 1024 raised to the power of i, stores it in p. 78# 79# round takes the value of size_bytes, divides it by p, and stores the result 80# in s at precision of 2 decimal places. 81# 82# A formatted string with size s and binary prefix size_name[i] is returned. 83def convert_size(size_bytes: int) -> str: 84 """ 85 Converts a decimal integer of bytes to its respective binary-prefixed size. 86 87 Parameters: 88 size_bytes (int): A decimal integer. 89 90 Returns: 91 (str): Binary-prefixed size of size_bytes formatted as a string. 92 """ 93 if size_bytes == 0: 94 return "0 B" 95 size_name = ("B", "KiB", "MiB", "GiB") 96 i = int(math.floor(math.log(size_bytes, 1024))) 97 p = math.pow(1024, i) 98 s = round(size_bytes / p, 2) 99 return "%s %s" % (s, size_name[i]) 100 101 102REGISTRY_URL = "registry.hyperreal.coffee" 103 104 105def get_auth() -> str: 106 """ 107 Get the base64 encoded password for registry autentication. 108 109 Returns: 110 auth (str): A string containing the base64 encoded password. 111 """ 112 try: 113 with open("/run/user/1000/containers/auth.json", "r") as authfile: 114 json_data = json.loads(authfile.read()) 115 except Exception as ex: 116 log.exception(ex) 117 118 auth = json_data["auths"][REGISTRY_URL]["auth"] 119 return auth 120 121 122def get_headers() -> dict: 123 """ 124 Returns headers for HTTP request authentication to the registry server. 125 126 Returns: 127 headers (dict): A dict of HTTP headers 128 """ 129 return { 130 "Accept": "application/vnd.oci.image.manifest.v1+json", 131 "Authorization": "Basic " + get_auth(), 132 } 133 134 135def get_json_response(request: str, url: str) -> dict: 136 """ 137 Connects to registry and returns response data as JSON. 138 139 Parameters: 140 request (str): A string like "GET" or "DELETE" 141 url (str) : A string containing the URL of the requested data 142 143 Returns: 144 json_data (dict): JSON data as a dict object 145 """ 146 conn = http.client.HTTPSConnection(REGISTRY_URL) 147 headers = get_headers() 148 try: 149 conn.request(request, url, "", headers) 150 res = conn.getresponse() 151 data = res.read() 152 json_data = json.loads(data.decode("utf-8")) 153 except Exception as ex: 154 log.exception(ex) 155 156 return json_data 157 158 159def get_repositories(): 160 """ 161 Prints a Rich Tree that lists the repositories of the registry. 162 """ 163 164 json_data = get_json_response("GET", "/v2/_catalog") 165 repo_tree = Tree("[green]Repositories") 166 for repo in json_data["repositories"]: 167 repo_tree.add("[blue]%s" % repo) 168 169 print(repo_tree) 170 171 172def get_tags(repo: str): 173 """ 174 Prints a Rich Tree that lists the tags for the given repository. 175 176 Parameters: 177 repo (str): A string containing the name of the repo 178 """ 179 json_data = get_json_response("GET", "/v2/" + repo + "/tags/list") 180 tags_tree = Tree("[green]%s tags" % repo) 181 for tag in json_data["tags"]: 182 tags_tree.add("[cyan]:%s" % tag) 183 184 print(tags_tree) 185 186 187def get_manifests(repo: str, tag: str): 188 """ 189 Prints a Rich grid table that displays the manifests and metadata of the 190 image repository. 191 192 Parameters: 193 repo (str): A string containing the name of the repo 194 tag (str) : A string containing the tag of the desired image 195 """ 196 json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag) 197 198 grid_meta = Table.grid(expand=True) 199 grid_meta.add_column() 200 grid_meta.add_column() 201 meta_schema_version_key = Text("Schema version") 202 meta_schema_version_key.stylize("bold green", 0) 203 meta_schema_version_value = Text(str(json_data["schemaVersion"])) 204 meta_media_type_key = Text("Media type") 205 meta_media_type_key.stylize("bold green", 0) 206 meta_media_type_value = Text(json_data["mediaType"]) 207 grid_meta.add_row(meta_schema_version_key, meta_schema_version_value) 208 grid_meta.add_row(meta_media_type_key, meta_media_type_value) 209 210 grid_config = Table.grid(expand=True) 211 grid_config.add_column() 212 grid_config.add_column() 213 config_media_type_key = Text("Media type") 214 config_media_type_key.stylize("bold green", 0) 215 config_media_type_value = Text(json_data["config"]["mediaType"]) 216 config_digest_key = Text("Digest") 217 config_digest_key.stylize("bold green", 0) 218 config_digest_value = Text(json_data["config"]["digest"]) 219 config_size_key = Text("Size") 220 config_size_key.stylize("bold green", 0) 221 config_size_value = Text(convert_size(json_data["config"]["size"])) 222 grid_config.add_row(config_media_type_key, config_media_type_value) 223 grid_config.add_row(config_digest_key, config_digest_value) 224 grid_config.add_row(config_size_key, config_size_value) 225 226 grid_annotations = Table.grid(expand=True) 227 grid_annotations.add_column() 228 grid_annotations.add_column() 229 for item in json_data["annotations"].items(): 230 annotations_item_key = Text(item[0]) 231 annotations_item_key.stylize("bold green", 0) 232 annotations_item_value = Text(item[1]) 233 grid_annotations.add_row(annotations_item_key, annotations_item_value) 234 235 total_size = sum(layer.get("size") for layer in json_data["layers"]) 236 table_layers = Table(box=None, show_footer=True) 237 table_layers.add_column( 238 "Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:" 239 ) 240 table_layers.add_column( 241 "Size", 242 justify="left", 243 style="cyan", 244 no_wrap=True, 245 footer=convert_size(total_size), 246 ) 247 for layer in json_data["layers"]: 248 table_layers.add_row(layer.get("digest"), convert_size(layer.get("size"))) 249 250 panel_group = Group( 251 Panel(grid_meta, title="[bold blue]Metadata"), 252 Panel(grid_config, title="[bold blue]Config"), 253 Panel(grid_annotations, title="Annotations"), 254 Panel( 255 table_layers, 256 title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"), 257 ), 258 ) 259 print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag))) 260 261 262def delete_image(repo: str, tag: str): 263 """ 264 Removes the given tag from the image. If the given tag is the only tag, 265 removes the image. 266 267 Parameters: 268 repo (str): A string containing the name of the repo 269 tag (str) : A string containing the tag to be removed 270 """ 271 try: 272 conn = http.client.HTTPSConnection(REGISTRY_URL) 273 headers = get_headers() 274 conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers) 275 res = conn.getresponse() 276 docker_content_digest = res.getheader("Docker-Content-Digest") 277 except Exception as ex: 278 log.exception(ex) 279 280 try: 281 conn.request( 282 "DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers 283 ) 284 except Exception as ex: 285 log.exception(ex) 286 287 print("Untagged %s:%s successfully" % (repo, tag)) 288 289 290def garbage_collection(): 291 """ 292 Runs garbage collection command on the remote registry server. Requires 293 SSH public key access. 294 """ 295 command = "/usr/local/bin/registry-gc" 296 297 try: 298 ssh = subprocess.Popen( 299 ["ssh", "%s" % REGISTRY_URL, command], 300 shell=False, 301 stdout=subprocess.PIPE, 302 stderr=subprocess.PIPE, 303 text=True, 304 ) 305 result = ssh.stdout.readlines() 306 if result == []: 307 log.error(ssh.stderr.readlines()) 308 else: 309 print(result) 310 except Exception as ex: 311 log.exception(ex) 312 313 314def remove_repo(repo: str): 315 """ 316 Runs command on remote registry server to remove the given repo. 317 318 Parameters: 319 repo (str): A string containing the name of the repo. 320 """ 321 command = "/usr/local/bin/registry-rm-repo " + repo 322 323 try: 324 ssh = subprocess.Popen( 325 ["ssh", "%s" % REGISTRY_URL, command], 326 shell=False, 327 stdout=subprocess.PIPE, 328 stderr=subprocess.PIPE, 329 text=True, 330 ) 331 result = ssh.stdout.readlines() 332 if result == []: 333 log.error(ssh.stderr.readlines()) 334 else: 335 print(result) 336 except Exception as ex: 337 log.exception(ex) 338 339 340if __name__ == "__main__": 341 args = docopt(__doc__, options_first=True) 342 match args["<subcommand>"]: 343 case "repos": 344 get_repositories() 345 case "tags": 346 get_tags(args["<args>"][0]) 347 case "manifests": 348 get_manifests(args["<args>"][0], args["<args>"][1]) 349 case "rmi": 350 delete_image(args["<args>"][0], args["<args>"][1]) 351 case "gc": 352 garbage_collection() 353 case "rmr": 354 remove_repo(args["<args>"]) 355 case _: 356 if args["<subcommand>"] in ["help", None]: 357 exit(subprocess.call(["python3", "ocirh", "--help"])) 358 else: 359 exit( 360 "%r is not a ocirh subcommand. See 'ocirh --help." 361 % args["<subcommand>"] 362 )