On a quest for agency in Bellairs
1import asyncio
2import socket
3
4import capnp
5
6capnp.remove_import_hook()
7
8bellairs_capnp = capnp.load("bellairs.capnp")
9
10"""
11Mock file server so I can test this.
12"""
13
14
15class TestServer(bellairs_capnp.File.Server):
16 def __init__(self):
17 self.data = "Voop"
18
19 async def size(self, **kwargs):
20 return len(self.data)
21
22 async def read(self, offset, len, **kwargs):
23 return self.data
24
25
26async def client(sock):
27 # See https://capnproto.github.io/pycapnp/capnp.html?highlight=twopartyclient#capnp.TwoPartyClient
28 # AP: I am a bit unsure why the only thing available here
29 # is a two-party client, but this works as a hack.
30 client = capnp.TwoPartyClient(sock)
31 cap = client.bootstrap()
32 cap = cap.cast_as(bellairs_capnp.File)
33 result = await cap.size()
34 size = result.size
35 print("Got %d" % size)
36 assert size == 4
37
38
39async def main():
40 # Create a UNIX socket pair, because the network is
41 # more complex.
42 client_end, server_end = socket.socketpair(socket.AF_UNIX)
43 # Create AsyncIoStreams, which is really a wrapper around
44 # the C++ Capnproto connection. See https://github.com/capnproto/pycapnp/blob/59a639fa977e4a2e19c6cc60b44cbc9926418710/capnp/lib/capnp.pyx#L1314
45 client_end = await capnp.AsyncIoStream.create_connection(sock=client_end)
46 server_end = await capnp.AsyncIoStream.create_connection(sock=server_end)
47 # Create a TwoPartyServer. Better options
48 # are available (e.g., see https://github.com/capnproto/pycapnp/blob/master/examples/async_ssl_server.py#L60), but
49 # I did this for now. I think we will need to use
50 # `AsyncIoStream.create_server` and do some amount
51 # of indirection in there
52 _ = capnp.TwoPartyServer(server_end, bootstrap=TestServer())
53 print("Started file server")
54 await client(client_end)
55
56
57if __name__ == "__main__":
58 asyncio.run(capnp.run(main()))