1let
2
3 port = 4222;
4 username = "client";
5 password = "password";
6 topic = "foo.bar";
7
8in
9import ./make-test-python.nix (
10 { pkgs, lib, ... }:
11 {
12 name = "nats";
13 meta = with pkgs.lib; {
14 maintainers = with maintainers; [ c0deaddict ];
15 };
16
17 nodes =
18 let
19 client =
20 { pkgs, ... }:
21 {
22 environment.systemPackages = with pkgs; [ natscli ];
23 };
24 in
25 {
26 server =
27 { pkgs, ... }:
28 {
29 networking.firewall.allowedTCPPorts = [ port ];
30 services.nats = {
31 inherit port;
32 enable = true;
33 settings = {
34 authorization = {
35 users = [
36 {
37 user = username;
38 inherit password;
39 }
40 ];
41 };
42 };
43 };
44 };
45
46 client1 = client;
47 client2 = client;
48 };
49
50 testScript =
51 let
52 file = "/tmp/msg";
53 in
54 ''
55 def nats_cmd(*args):
56 return (
57 "nats "
58 "--server=nats://server:${toString port} "
59 "--user=${username} "
60 "--password=${password} "
61 "{}"
62 ).format(" ".join(args))
63
64 def parallel(*fns):
65 from threading import Thread
66 threads = [ Thread(target=fn) for fn in fns ]
67 for t in threads: t.start()
68 for t in threads: t.join()
69
70 start_all()
71 server.wait_for_unit("nats.service")
72
73 with subtest("pub sub"):
74 parallel(
75 lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")),
76 lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))),
77 )
78 '';
79 }
80)