at master 1.7 kB view raw
1let 2 3 port = 4222; 4 username = "client"; 5 password = "password"; 6 topic = "foo.bar"; 7 8in 9{ pkgs, lib, ... }: 10{ 11 name = "nats"; 12 meta = with pkgs.lib; { 13 maintainers = with maintainers; [ c0deaddict ]; 14 }; 15 16 nodes = 17 let 18 client = 19 { pkgs, ... }: 20 { 21 environment.systemPackages = with pkgs; [ natscli ]; 22 }; 23 in 24 { 25 server = 26 { pkgs, ... }: 27 { 28 networking.firewall.allowedTCPPorts = [ port ]; 29 services.nats = { 30 inherit port; 31 enable = true; 32 settings = { 33 authorization = { 34 users = [ 35 { 36 user = username; 37 inherit password; 38 } 39 ]; 40 }; 41 }; 42 }; 43 }; 44 45 client1 = client; 46 client2 = client; 47 }; 48 49 testScript = 50 let 51 file = "/tmp/msg"; 52 in 53 '' 54 def nats_cmd(*args): 55 return ( 56 "nats " 57 "--server=nats://server:${toString port} " 58 "--user=${username} " 59 "--password=${password} " 60 "{}" 61 ).format(" ".join(args)) 62 63 def parallel(*fns): 64 from threading import Thread 65 threads = [ Thread(target=fn) for fn in fns ] 66 for t in threads: t.start() 67 for t in threads: t.join() 68 69 start_all() 70 server.wait_for_unit("nats.service") 71 72 with subtest("pub sub"): 73 parallel( 74 lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")), 75 lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))), 76 ) 77 ''; 78}