On a quest for agency in Bellairs
at main 1.9 kB view raw
1import asyncio 2import socket 3 4import capnp 5 6capnp.remove_import_hook() 7 8bellairs_capnp = capnp.load("bellairs.capnp") 9 10""" 11Mock file server so I can test this. 12""" 13 14 15class TestServer(bellairs_capnp.File.Server): 16 def __init__(self): 17 self.data = "Voop" 18 19 async def size(self, **kwargs): 20 return len(self.data) 21 22 async def read(self, offset, len, **kwargs): 23 return self.data 24 25 26async def client(sock): 27 # See https://capnproto.github.io/pycapnp/capnp.html?highlight=twopartyclient#capnp.TwoPartyClient 28 # AP: I am a bit unsure why the only thing available here 29 # is a two-party client, but this works as a hack. 30 client = capnp.TwoPartyClient(sock) 31 cap = client.bootstrap() 32 cap = cap.cast_as(bellairs_capnp.File) 33 result = await cap.size() 34 size = result.size 35 print("Got %d" % size) 36 assert size == 4 37 38 39async def main(): 40 # Create a UNIX socket pair, because the network is 41 # more complex. 42 client_end, server_end = socket.socketpair(socket.AF_UNIX) 43 # Create AsyncIoStreams, which is really a wrapper around 44 # the C++ Capnproto connection. See https://github.com/capnproto/pycapnp/blob/59a639fa977e4a2e19c6cc60b44cbc9926418710/capnp/lib/capnp.pyx#L1314 45 client_end = await capnp.AsyncIoStream.create_connection(sock=client_end) 46 server_end = await capnp.AsyncIoStream.create_connection(sock=server_end) 47 # Create a TwoPartyServer. Better options 48 # are available (e.g., see https://github.com/capnproto/pycapnp/blob/master/examples/async_ssl_server.py#L60), but 49 # I did this for now. I think we will need to use 50 # `AsyncIoStream.create_server` and do some amount 51 # of indirection in there 52 _ = capnp.TwoPartyServer(server_end, bootstrap=TestServer()) 53 print("Started file server") 54 await client(client_end) 55 56 57if __name__ == "__main__": 58 asyncio.run(capnp.run(main()))