at master 5.0 kB view raw
1#!/usr/bin/env python3 2import json 3import sys 4import time 5import os 6import hashlib 7import base64 8 9from http.server import BaseHTTPRequestHandler, HTTPServer 10from urllib.parse import urlparse, parse_qs 11from typing import Dict 12 13SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY'] 14MOCKUSER="mockuser_nixos_org" 15MOCKADMIN="mockadmin_nixos_org" 16 17 18def w(msg: bytes): 19 sys.stderr.write(f"{msg}\n") 20 sys.stderr.flush() 21 22 23def gen_fingerprint(pubkey: str): 24 decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1]) 25 return hashlib.sha256(decoded_key).hexdigest() 26 27 28def gen_email(username: str): 29 """username seems to be a 21 characters long number string, so mimic that in a reproducible way""" 30 return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21] 31 32 33def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict: 34 snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey) 35 # seems to be a 21 characters long numberstring, so mimic that in a reproducible way 36 email = gen_email(username) 37 return { 38 "loginProfiles": [ 39 { 40 "name": email, 41 "posixAccounts": [ 42 { 43 "primary": True, 44 "username": username, 45 "uid": uid, 46 "gid": gid, 47 "homeDirectory": home_directory, 48 "operatingSystemType": "LINUX" 49 } 50 ], 51 "sshPublicKeys": { 52 snakeoil_pubkey_fingerprint: { 53 "key": snakeoil_pubkey, 54 "expirationTimeUsec": str((time.time() + 600) * 1000000), # 10 minutes in the future 55 "fingerprint": snakeoil_pubkey_fingerprint 56 } 57 } 58 } 59 ] 60 } 61 62 63class ReqHandler(BaseHTTPRequestHandler): 64 65 def _send_json_ok(self, data: dict): 66 self.send_response(200) 67 self.send_header('Content-type', 'application/json') 68 self.end_headers() 69 out = json.dumps(data).encode() 70 w(out) 71 self.wfile.write(out) 72 73 def _send_json_success(self, success=True): 74 self.send_response(200) 75 self.send_header('Content-type', 'application/json') 76 self.end_headers() 77 out = json.dumps({"success": success}).encode() 78 w(out) 79 self.wfile.write(out) 80 81 def _send_404(self): 82 self.send_response(404) 83 self.end_headers() 84 85 def do_GET(self): 86 p = str(self.path) 87 pu = urlparse(p) 88 params = parse_qs(pu.query) 89 90 # users endpoint 91 if pu.path == "/computeMetadata/v1/oslogin/users": 92 # mockuser and mockadmin are allowed to login, both use the same snakeoil public key 93 if params.get('username') == [MOCKUSER] or params.get('uid') == ["1009719690"]: 94 username = MOCKUSER 95 uid = "1009719690" 96 elif params.get('username') == [MOCKADMIN] or params.get('uid') == ["1009719691"]: 97 username = MOCKADMIN 98 uid = "1009719691" 99 else: 100 self._send_404() 101 return 102 103 self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) 104 return 105 106 # we need to provide something at the groups endpoint. 107 # the nss module does segfault if we don't. 108 elif pu.path == "/computeMetadata/v1/oslogin/groups": 109 self._send_json_ok({ 110 "posixGroups": [ 111 {"name" : "demo", "gid" : 4294967295} 112 ], 113 }) 114 return 115 116 # authorize endpoint 117 elif pu.path == "/computeMetadata/v1/oslogin/authorize": 118 # is user allowed to login? 119 if params.get("policy") == ["login"]: 120 # mockuser and mockadmin are allowed to login 121 if params.get('email') == [gen_email(MOCKUSER)] or params.get('email') == [gen_email(MOCKADMIN)]: 122 self._send_json_success() 123 return 124 self._send_json_success(False) 125 return 126 # is user allowed to become root? 127 elif params.get("policy") == ["adminLogin"]: 128 # only mockadmin is allowed to become admin 129 self._send_json_success((params['email'] == [gen_email(MOCKADMIN)])) 130 return 131 # send 404 for other policies 132 else: 133 self._send_404() 134 return 135 else: 136 sys.stderr.write(f"Unhandled path: {p}\n") 137 sys.stderr.flush() 138 self.send_response(404) 139 self.end_headers() 140 self.wfile.write(b'') 141 142 143if __name__ == '__main__': 144 s = HTTPServer(('0.0.0.0', 80), ReqHandler) 145 s.serve_forever()