at 25.11-pre 1.9 kB view raw
1let 2 3 port = 4222; 4 username = "client"; 5 password = "password"; 6 topic = "foo.bar"; 7 8in 9import ./make-test-python.nix ( 10 { pkgs, lib, ... }: 11 { 12 name = "nats"; 13 meta = with pkgs.lib; { 14 maintainers = with maintainers; [ c0deaddict ]; 15 }; 16 17 nodes = 18 let 19 client = 20 { pkgs, ... }: 21 { 22 environment.systemPackages = with pkgs; [ natscli ]; 23 }; 24 in 25 { 26 server = 27 { pkgs, ... }: 28 { 29 networking.firewall.allowedTCPPorts = [ port ]; 30 services.nats = { 31 inherit port; 32 enable = true; 33 settings = { 34 authorization = { 35 users = [ 36 { 37 user = username; 38 inherit password; 39 } 40 ]; 41 }; 42 }; 43 }; 44 }; 45 46 client1 = client; 47 client2 = client; 48 }; 49 50 testScript = 51 let 52 file = "/tmp/msg"; 53 in 54 '' 55 def nats_cmd(*args): 56 return ( 57 "nats " 58 "--server=nats://server:${toString port} " 59 "--user=${username} " 60 "--password=${password} " 61 "{}" 62 ).format(" ".join(args)) 63 64 def parallel(*fns): 65 from threading import Thread 66 threads = [ Thread(target=fn) for fn in fns ] 67 for t in threads: t.start() 68 for t in threads: t.join() 69 70 start_all() 71 server.wait_for_unit("nats.service") 72 73 with subtest("pub sub"): 74 parallel( 75 lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")), 76 lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))), 77 ) 78 ''; 79 } 80)