1import ./make-test-python.nix ({ pkgs, ...} :
2
3let
4 port = 10004;
5 tcpPort = 10005;
6 httpPort = 10080;
7 tcpStreamPort = 10006;
8 bufferSize = 742;
9in {
10 name = "snapcast";
11 meta = with pkgs.lib.maintainers; {
12 maintainers = [ hexa ];
13 };
14
15 nodes = {
16 server = {
17 services.snapserver = {
18 enable = true;
19 port = port;
20 tcp.port = tcpPort;
21 http.port = httpPort;
22 openFirewall = true;
23 buffer = bufferSize;
24 streams = {
25 mpd = {
26 type = "pipe";
27 location = "/run/snapserver/mpd";
28 query.mode = "create";
29 };
30 bluetooth = {
31 type = "pipe";
32 location = "/run/snapserver/bluetooth";
33 };
34 tcp = {
35 type = "tcp";
36 location = "127.0.0.1:${toString tcpStreamPort}";
37 };
38 meta = {
39 type = "meta";
40 location = "/mpd/bluetooth/tcp";
41 };
42 };
43 };
44 environment.systemPackages = [ pkgs.snapcast ];
45 };
46 client = {
47 environment.systemPackages = [ pkgs.snapcast ];
48 };
49 };
50
51 testScript = ''
52 import json
53
54 get_rpc_version = {"id": "1", "jsonrpc": "2.0", "method": "Server.GetRPCVersion"}
55
56 start_all()
57
58 server.wait_for_unit("snapserver.service")
59 server.wait_until_succeeds("ss -ntl | grep -q ${toString port}")
60 server.wait_until_succeeds("ss -ntl | grep -q ${toString tcpPort}")
61 server.wait_until_succeeds("ss -ntl | grep -q ${toString httpPort}")
62 server.wait_until_succeeds("ss -ntl | grep -q ${toString tcpStreamPort}")
63
64 with subtest("check that pipes are created"):
65 server.succeed("test -p /run/snapserver/mpd")
66 server.succeed("test -p /run/snapserver/bluetooth")
67
68 with subtest("test tcp json-rpc"):
69 server.succeed(f"echo '{json.dumps(get_rpc_version)}' | nc -w 1 localhost ${toString tcpPort}")
70
71 with subtest("test http json-rpc"):
72 server.succeed(
73 "curl --fail http://localhost:${toString httpPort}/jsonrpc -d '{json.dumps(get_rpc_version)}'"
74 )
75
76 with subtest("test a ipv6 connection"):
77 server.execute("systemd-run --unit=snapcast-local-client snapclient -h ::1 -p ${toString port}")
78 server.wait_until_succeeds(
79 "journalctl -o cat -u snapserver.service | grep -q 'Hello from'"
80 )
81 server.wait_until_succeeds("journalctl -o cat -u snapcast-local-client | grep -q 'buffer: ${toString bufferSize}'")
82
83 with subtest("test a connection"):
84 client.execute("systemd-run --unit=snapcast-client snapclient -h server -p ${toString port}")
85 server.wait_until_succeeds(
86 "journalctl -o cat -u snapserver.service | grep -q 'Hello from'"
87 )
88 client.wait_until_succeeds("journalctl -o cat -u snapcast-client | grep -q 'buffer: ${toString bufferSize}'")
89 '';
90})