at 23.11-pre 7.2 kB view raw
1import ./make-test-python.nix ({ pkgs, lib, ... }: 2 3 let 4 nodesIps = [ 5 "192.168.1.1" 6 "192.168.1.2" 7 "192.168.1.3" 8 ]; 9 10 createNode = index: { pkgs, ... }: 11 let 12 ip = builtins.elemAt nodesIps index; # since we already use IPs to identify servers 13 in 14 { 15 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [ 16 { address = ip; prefixLength = 16; } 17 ]; 18 19 networking.firewall.allowedTCPPorts = [ 5432 8008 5010 ]; 20 21 environment.systemPackages = [ pkgs.jq ]; 22 23 services.patroni = { 24 25 enable = true; 26 27 postgresqlPackage = pkgs.postgresql_14.withPackages (p: [ p.pg_safeupdate ]); 28 29 scope = "cluster1"; 30 name = "node${toString(index + 1)}"; 31 nodeIp = ip; 32 otherNodesIps = builtins.filter (h: h != ip) nodesIps; 33 softwareWatchdog = true; 34 35 settings = { 36 bootstrap = { 37 dcs = { 38 ttl = 30; 39 loop_wait = 10; 40 retry_timeout = 10; 41 maximum_lag_on_failover = 1048576; 42 }; 43 initdb = [ 44 { encoding = "UTF8"; } 45 "data-checksums" 46 ]; 47 }; 48 49 postgresql = { 50 use_pg_rewind = true; 51 use_slots = true; 52 authentication = { 53 replication = { 54 username = "replicator"; 55 }; 56 superuser = { 57 username = "postgres"; 58 }; 59 rewind = { 60 username = "rewind"; 61 }; 62 }; 63 parameters = { 64 listen_addresses = "${ip}"; 65 wal_level = "replica"; 66 hot_standby_feedback = "on"; 67 unix_socket_directories = "/tmp"; 68 }; 69 pg_hba = [ 70 "host replication replicator 192.168.1.0/24 md5" 71 # Unsafe, do not use for anything other than tests 72 "host all all 0.0.0.0/0 trust" 73 ]; 74 }; 75 76 etcd3 = { 77 host = "192.168.1.4:2379"; 78 }; 79 }; 80 81 environmentFiles = { 82 PATRONI_REPLICATION_PASSWORD = pkgs.writeText "replication-password" "postgres"; 83 PATRONI_SUPERUSER_PASSWORD = pkgs.writeText "superuser-password" "postgres"; 84 PATRONI_REWIND_PASSWORD = pkgs.writeText "rewind-password" "postgres"; 85 }; 86 }; 87 88 # We always want to restart so the tests never hang 89 systemd.services.patroni.serviceConfig.StartLimitIntervalSec = 0; 90 }; 91 in 92 { 93 name = "patroni"; 94 95 nodes = { 96 node1 = createNode 0; 97 node2 = createNode 1; 98 node3 = createNode 2; 99 100 etcd = { pkgs, ... }: { 101 102 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [ 103 { address = "192.168.1.4"; prefixLength = 16; } 104 ]; 105 106 services.etcd = { 107 enable = true; 108 listenClientUrls = [ "http://192.168.1.4:2379" ]; 109 }; 110 111 networking.firewall.allowedTCPPorts = [ 2379 ]; 112 }; 113 114 client = { pkgs, ... }: { 115 environment.systemPackages = [ pkgs.postgresql_14 ]; 116 117 networking.interfaces.eth1.ipv4.addresses = pkgs.lib.mkOverride 0 [ 118 { address = "192.168.2.1"; prefixLength = 16; } 119 ]; 120 121 services.haproxy = { 122 enable = true; 123 config = '' 124 global 125 maxconn 100 126 127 defaults 128 log global 129 mode tcp 130 retries 2 131 timeout client 30m 132 timeout connect 4s 133 timeout server 30m 134 timeout check 5s 135 136 listen cluster1 137 bind 127.0.0.1:5432 138 option httpchk 139 http-check expect status 200 140 default-server inter 3s fall 3 rise 2 on-marked-down shutdown-sessions 141 ${builtins.concatStringsSep "\n" (map (ip: "server postgresql_${ip}_5432 ${ip}:5432 maxconn 100 check port 8008") nodesIps)} 142 ''; 143 }; 144 }; 145 }; 146 147 148 149 testScript = '' 150 nodes = [node1, node2, node3] 151 152 def wait_for_all_nodes_ready(expected_replicas=2): 153 booted_nodes = filter(lambda node: node.booted, nodes) 154 for node in booted_nodes: 155 print(node.succeed("patronictl list cluster1")) 156 node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'length') == {expected_replicas + 1} ]") 157 node.wait_until_succeeds("[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Leader$\"))) | map(select(.State | test(\"^running$\"))) | length') == 1 ]") 158 node.wait_until_succeeds(f"[ $(patronictl list -f json cluster1 | jq 'map(select(.Role | test(\"^Replica$\"))) | map(select(.State | test(\"^running$\"))) | length') == {expected_replicas} ]") 159 print(node.succeed("patronictl list cluster1")) 160 client.wait_until_succeeds("psql -h 127.0.0.1 -U postgres --command='select 1;'") 161 162 def run_dummy_queries(): 163 client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='insert into dummy(val) values (101);'") 164 client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select val from dummy where val = 101;') -eq 101") 165 client.succeed("psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='delete from dummy where val = 101;'") 166 167 start_all() 168 169 etcd.wait_for_unit("etcd.service") 170 171 with subtest("should bootstrap a new patroni cluster"): 172 wait_for_all_nodes_ready() 173 174 with subtest("should be able to insert and select"): 175 client.succeed("psql -h 127.0.0.1 -U postgres --command='create table dummy as select * from generate_series(1, 100) as val;'") 176 client.succeed("test $(psql -h 127.0.0.1 -U postgres --pset='pager=off' --tuples-only --command='select count(distinct val) from dummy;') -eq 100") 177 178 with subtest("should restart after all nodes are crashed"): 179 for node in nodes: 180 node.crash() 181 for node in nodes: 182 node.start() 183 wait_for_all_nodes_ready() 184 185 with subtest("should be able to run queries while any one node is crashed"): 186 masterNodeName = node1.succeed("patronictl list -f json cluster1 | jq '.[] | select(.Role | test(\"^Leader$\")) | .Member' -r").strip() 187 masterNodeIndex = int(masterNodeName[len(masterNodeName)-1]) - 1 188 189 # Move master node at the end of the list to avoid multiple failovers (makes the test faster and more consistent) 190 nodes.append(nodes.pop(masterNodeIndex)) 191 192 for node in nodes: 193 node.crash() 194 wait_for_all_nodes_ready(1) 195 196 # Execute some queries while a node is down. 197 run_dummy_queries() 198 199 # Restart crashed node. 200 node.start() 201 wait_for_all_nodes_ready() 202 203 # Execute some queries with the node back up. 204 run_dummy_queries() 205 ''; 206 })