On a quest for agency in Bellairs
1""" 2A Python client for bellairs.quest to access directories, etc. 3""" 4 5import asyncio 6import socket 7from typing import Any 8 9import capnp # type: ignore 10 11capnp.remove_import_hook() 12 13cp_storage = capnp.load("schema/storage.capnp") 14 15 16class Directory: 17 """ 18 Wrapper class for the directory. Many of these methods are 19 async, unfortunately. 20 """ 21 22 def __init__(self, addr: str, port: str, family: int): 23 """Create a directory using the provided address and port""" 24 # family should be AF_UNIX, AF_INET, or AF_INET6. Not sure that capnp 25 # can handle any of the others. 26 # TODO: Hmm should `port` be int? 27 # TODO: If we want SSL, need to create a ssl.context, and pass it in 28 # using `ssl=ctx` 29 self._capnp_client = capnp.TwoPartyClient(addr, port).bootstrap() 30 self._capnp_dir = self._capnp_client.cast_as(cp_storage.Directory) 31 32 async def list(self): 33 result = await self._capnp_dir.list() 34 # Return names for now. Still need to write file 35 return [(e.name, File(e.file)) for e in result.entries] 36 37 38class File: 39 """ 40 Wrapper class for files 41 """ 42 43 def __init__(self, cobj: Any): 44 # Note: cobj is of type cp_storage.File, but MyPy 45 # cannot deal with types defined in this way. 46 self._capnp_file = cobj 47 48 async def size(self) -> int: 49 result = await self._capnp_file.size() 50 return result.size 51 52 # FIXME: Find a better type than Any 53 async def read(self, offset: int = 0, len: int = 0xFFFFFFFFFFFFFFFF) -> Any: 54 result = await self._capnp_file.read(offset, len) 55 return result.data