1let
2
3 port = 4222;
4 username = "client";
5 password = "password";
6 topic = "foo.bar";
7
8in
9{ pkgs, lib, ... }:
10{
11 name = "nats";
12 meta = with pkgs.lib; {
13 maintainers = with maintainers; [ c0deaddict ];
14 };
15
16 nodes =
17 let
18 client =
19 { pkgs, ... }:
20 {
21 environment.systemPackages = with pkgs; [ natscli ];
22 };
23 in
24 {
25 server =
26 { pkgs, ... }:
27 {
28 networking.firewall.allowedTCPPorts = [ port ];
29 services.nats = {
30 inherit port;
31 enable = true;
32 settings = {
33 authorization = {
34 users = [
35 {
36 user = username;
37 inherit password;
38 }
39 ];
40 };
41 };
42 };
43 };
44
45 client1 = client;
46 client2 = client;
47 };
48
49 testScript =
50 let
51 file = "/tmp/msg";
52 in
53 ''
54 def nats_cmd(*args):
55 return (
56 "nats "
57 "--server=nats://server:${toString port} "
58 "--user=${username} "
59 "--password=${password} "
60 "{}"
61 ).format(" ".join(args))
62
63 def parallel(*fns):
64 from threading import Thread
65 threads = [ Thread(target=fn) for fn in fns ]
66 for t in threads: t.start()
67 for t in threads: t.join()
68
69 start_all()
70 server.wait_for_unit("nats.service")
71
72 with subtest("pub sub"):
73 parallel(
74 lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")),
75 lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))),
76 )
77 '';
78}