On a quest for agency in Bellairs
1"""
2A Python client for bellairs.quest to access directories, etc.
3"""
4
5import asyncio
6import socket
7from typing import Any
8
9import capnp # type: ignore
10
11capnp.remove_import_hook()
12
13cp_storage = capnp.load("schema/storage.capnp")
14
15
16class Directory:
17 """
18 Wrapper class for the directory. Many of these methods are
19 async, unfortunately.
20 """
21
22 def __init__(self, addr: str, port: str, family: int):
23 """Create a directory using the provided address and port"""
24 # family should be AF_UNIX, AF_INET, or AF_INET6. Not sure that capnp
25 # can handle any of the others.
26 # TODO: Hmm should `port` be int?
27 # TODO: If we want SSL, need to create a ssl.context, and pass it in
28 # using `ssl=ctx`
29 self._capnp_client = capnp.TwoPartyClient(addr, port).bootstrap()
30 self._capnp_dir = self._capnp_client.cast_as(cp_storage.Directory)
31
32 async def list(self):
33 result = await self._capnp_dir.list()
34 # Return names for now. Still need to write file
35 return [(e.name, File(e.file)) for e in result.entries]
36
37
38class File:
39 """
40 Wrapper class for files
41 """
42
43 def __init__(self, cobj: Any):
44 # Note: cobj is of type cp_storage.File, but MyPy
45 # cannot deal with types defined in this way.
46 self._capnp_file = cobj
47
48 async def size(self) -> int:
49 result = await self._capnp_file.size()
50 return result.size
51
52 # FIXME: Find a better type than Any
53 async def read(self, offset: int = 0, len: int = 0xFFFFFFFFFFFFFFFF) -> Any:
54 result = await self._capnp_file.read(offset, len)
55 return result.data