1import ./make-test-python.nix ({ pkgs, lib, ... }:
2
3 let
4 nodesIps = [
5 "192.168.1.1"
6 "192.168.1.2"
7 "192.168.1.3"
8 ];
9
10 createNode = index: { pkgs, ... }:
11 let
12 ip = builtins.elemAt nodesIps index; # since we already use IPs to identify servers
13 in
14 {
15 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
16 { address = ip; prefixLength = 16; }
17 ];
18
19 networking.firewall.allowedTCPPorts = [ 5432 8008 5010 ];
20
21 environment.systemPackages = [ pkgs.jq ];
22
23 services.patroni = {
24
25 enable = true;
26
27 postgresqlPackage = pkgs.postgresql_14.withPackages (p: [ p.pg_safeupdate ]);
28
29 scope = "cluster1";
30 name = "node${toString(index + 1)}";
31 nodeIp = ip;
32 otherNodesIps = builtins.filter (h: h != ip) nodesIps;
33 softwareWatchdog = true;
34
35 settings = {
36 bootstrap = {
37 dcs = {
38 ttl = 30;
39 loop_wait = 10;
40 retry_timeout = 10;
41 maximum_lag_on_failover = 1048576;
42 };
43 initdb = [
44 { encoding = "UTF8"; }
45 "data-checksums"
46 ];
47 };
48
49 postgresql = {
50 use_pg_rewind = true;
51 use_slots = true;
52 authentication = {
53 replication = {
54 username = "replicator";
55 };
56 superuser = {
57 username = "postgres";
58 };
59 rewind = {
60 username = "rewind";
61 };
62 };
63 parameters = {
64 listen_addresses = "${ip}";
65 wal_level = "replica";
66 hot_standby_feedback = "on";
67 unix_socket_directories = "/tmp";
68 };
69 pg_hba = [
70 "host replication replicator 192.168.1.0/24 md5"
71 # Unsafe, do not use for anything other than tests
72 "host all all 0.0.0.0/0 trust"
73 ];
74 };
75
76 etcd3 = {
77 host = "192.168.1.4:2379";
78 };
79 };
80
81 environmentFiles = {
82 PATRONI_REPLICATION_PASSWORD = pkgs.writeText "replication-password" "postgres";
83 PATRONI_SUPERUSER_PASSWORD = pkgs.writeText "superuser-password" "postgres";
84 PATRONI_REWIND_PASSWORD = pkgs.writeText "rewind-password" "postgres";
85 };
86 };
87
88 # We always want to restart so the tests never hang
89 systemd.services.patroni.serviceConfig.StartLimitIntervalSec = 0;
90 };
91 in
92 {
93 name = "patroni";
94
95 nodes = {
96 node1 = createNode 0;
97 node2 = createNode 1;
98 node3 = createNode 2;
99
100 etcd = { pkgs, ... }: {
101
102 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
103 { address = "192.168.1.4"; prefixLength = 16; }
104 ];
105
106 services.etcd = {
107 enable = true;
108 listenClientUrls = [ "http://192.168.1.4:2379" ];
109 };
110
111 networking.firewall.allowedTCPPorts = [ 2379 ];
112 };
113
114 client = { pkgs, ... }: {
115 environment.systemPackages = [ pkgs.postgresql_14 ];
116
117 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [
118 { address = "192.168.2.1"; prefixLength = 16; }
119 ];
120
121 services.haproxy = {
122 enable = true;
123 config = ''
124 global
125 maxconn 100
126
127 defaults
128 log global
129 mode tcp
130 retries 2
131 timeout client 30m
132 timeout connect 4s
133 timeout server 30m
134 timeout check 5s
135
136 listen cluster1
137 bind 127.0.0.1:5432
138 option httpchk
139 http-check expect status 200
140 default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions
141 ${builtins.concatStringsSep "\n" (map (ip: "server postgresql_${ip}_5432 ${ip}:5432 maxconn 100 check port 8008") nodesIps)}
142 '';
143 };
144 };
145 };
146
147
148
149 testScript = ''
150 nodes = [node1, node2, node3]
151
152 def wait_for_all_nodes_ready(expected_replicas=2):
153 booted_nodes = filter(lambda node: node.booted, nodes)
154 for node in booted_nodes:
155 print(node.succeed("patronictl list cluster1"))
156 node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'length') == {expected_replicas + 1} ]")
157 node.wait_until_succeeds("[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Leader$\"))) | map(select(.State | test(\"^running$\"))) | length') == 1 ]")
158 node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Replica$\"))) | map(select(.State | test(\"^running$\"))) | length') == {expected_replicas} ]")
159 print(node.succeed("patronictl list cluster1"))
160 client.wait_until_succeeds("psql -h 127.0.0.1 -U postgres --command='select 1;'")
161
162 def run_dummy_queries():
163 client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='insert into dummy(val) values (101);'")
164 client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select val from dummy where val = 101;') -eq 101")
165 client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='delete from dummy where val = 101;'")
166
167 start_all()
168
169 with subtest("should bootstrap a new patroni cluster"):
170 wait_for_all_nodes_ready()
171
172 with subtest("should be able to insert and select"):
173 client.succeed("psql -h 127.0.0.1 -U postgres --command='create table dummy as select * from generate_series(1, 100) as val;'")
174 client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select count(distinct val) from dummy;') -eq 100")
175
176 with subtest("should restart after all nodes are crashed"):
177 for node in nodes:
178 node.crash()
179 for node in nodes:
180 node.start()
181 wait_for_all_nodes_ready()
182
183 with subtest("should be able to run queries while any one node is crashed"):
184 masterNodeName = node1.succeed("patronictl list -f json cluster1 | jq '.[] | select(.Role | test(\"^Leader$\")) | .Member' -r").strip()
185 masterNodeIndex = int(masterNodeName[len(masterNodeName)-1]) - 1
186
187 # Move master node at the end of the list to avoid multiple failovers (makes the test faster and more consistent)
188 nodes.append(nodes.pop(masterNodeIndex))
189
190 for node in nodes:
191 node.crash()
192 wait_for_all_nodes_ready(1)
193
194 # Execute some queries while a node is down.
195 run_dummy_queries()
196
197 # Restart crashed node.
198 node.start()
199 wait_for_all_nodes_ready()
200
201 # Execute some queries with the node back up.
202 run_dummy_queries()
203 '';
204 })