1{ ... }: 2{ 3 _module.args.testScriptSetup = # python 4 '' 5 from typing import List 6 from dataclasses import dataclass 7 import re 8 9 start_all() 10 11 cur_version_regex = re.compile(r'Current cluster layout version: (?P<ver>\d*)') 12 13 @dataclass 14 class S3Key: 15 key_name: str 16 key_id: str 17 secret_key: str 18 19 @dataclass 20 class GarageNode: 21 node_id: str 22 host: str 23 24 def get_node_fqn(machine: Machine) -> GarageNode: 25 node_id, host = machine.succeed("garage node id").split('@') 26 return GarageNode(node_id=node_id, host=host) 27 28 def get_node_id(machine: Machine) -> str: 29 return get_node_fqn(machine).node_id 30 31 def get_layout_version(machine: Machine) -> int: 32 version_data = machine.succeed("garage layout show") 33 m = cur_version_regex.search(version_data) 34 if m and m.group('ver') is not None: 35 return int(m.group('ver')) + 1 36 else: 37 raise ValueError('Cannot find current layout version') 38 39 def apply_garage_layout(machine: Machine, layouts: List[str]): 40 for layout in layouts: 41 machine.succeed(f"garage layout assign {layout}") 42 version = get_layout_version(machine) 43 machine.succeed(f"garage layout apply --version {version}") 44 45 def create_api_key(machine: Machine, key_name: str) -> S3Key: 46 output = machine.succeed(f"garage key create {key_name}") 47 return parse_api_key_data(output) 48 49 def get_api_key(machine: Machine, key_pattern: str) -> S3Key: 50 output = machine.succeed(f"garage key info {key_pattern}") 51 return parse_api_key_data(output) 52 53 def parse_api_key_data(text) -> S3Key: 54 key_creation_regex = re.compile(r'Key name: \s*(?P<key_name>.*)|' r'Key ID: \s*(?P<key_id>.*)|' r'Secret key: \s*(?P<secret_key>.*)', re.IGNORECASE) 55 fields = {} 56 for match in key_creation_regex.finditer(text): 57 for key, value in match.groupdict().items(): 58 if value: 59 fields[key] = value.strip() 60 try: 61 return S3Key(**fields) 62 except TypeError as e: 63 raise ValueError(f"Cannot parse API key data. Missing required field(s): {e}") 64 65 def test_bucket_writes(node): 66 node.succeed("garage bucket create test-bucket") 67 s3_key = create_api_key(node, "test-api-key") 68 node.succeed("garage bucket allow --read --write test-bucket --key test-api-key") 69 other_s3_key = get_api_key(node, 'test-api-key') 70 assert other_s3_key.secret_key == other_s3_key.secret_key 71 node.succeed( 72 f"mc alias set test-garage http://[::1]:3900 {s3_key.key_id} {s3_key.secret_key} --api S3v4" 73 ) 74 node.succeed("echo test | mc pipe test-garage/test-bucket/test.txt") 75 assert node.succeed("mc cat test-garage/test-bucket/test.txt").strip() == "test" 76 77 def test_bucket_over_http(node, bucket='test-bucket', url=None): 78 if url is None: 79 url = f"{bucket}.web.garage" 80 81 node.succeed(f'garage bucket website --allow {bucket}') 82 node.succeed(f'echo hello world | mc pipe test-garage/{bucket}/index.html') 83 assert (node.succeed(f"curl -H 'Host: {url}' http://localhost:3902")).strip() == 'hello world' 84 ''; 85}