1let
2 redisPort = 6379;
3 centrifugoPort = 8080;
4 nodes = [
5 "centrifugo1"
6 "centrifugo2"
7 "centrifugo3"
8 ];
9in
10{ lib, ... }: {
11 name = "centrifugo";
12 meta.maintainers = [ lib.maintainers.tie ];
13
14 nodes = lib.listToAttrs (lib.imap0
15 (index: name: {
16 inherit name;
17 value = { config, ... }: {
18 services.centrifugo = {
19 enable = true;
20 settings = {
21 inherit name;
22 port = centrifugoPort;
23 # See https://centrifugal.dev/docs/server/engines#redis-sharding
24 engine = "redis";
25 # Connect to local Redis shard via Unix socket.
26 redis_address =
27 let
28 otherNodes = lib.take index nodes ++ lib.drop (index + 1) nodes;
29 in
30 map (name: "${name}:${toString redisPort}") otherNodes ++ [
31 "unix://${config.services.redis.servers.centrifugo.unixSocket}"
32 ];
33 usage_stats_disable = true;
34 api_insecure = true;
35 };
36 extraGroups = [
37 config.services.redis.servers.centrifugo.user
38 ];
39 };
40 services.redis.servers.centrifugo = {
41 enable = true;
42 bind = null; # all interfaces
43 port = redisPort;
44 openFirewall = true;
45 settings.protected-mode = false;
46 };
47 };
48 })
49 nodes);
50
51 testScript = ''
52 import json
53
54 redisPort = ${toString redisPort}
55 centrifugoPort = ${toString centrifugoPort}
56
57 start_all()
58
59 for machine in machines:
60 machine.wait_for_unit("redis-centrifugo.service")
61 machine.wait_for_open_port(redisPort)
62
63 for machine in machines:
64 machine.wait_for_unit("centrifugo.service")
65 machine.wait_for_open_port(centrifugoPort)
66
67 # See https://centrifugal.dev/docs/server/server_api#info
68 def list_nodes(machine):
69 curl = "curl --fail-with-body --silent"
70 body = "{}"
71 resp = json.loads(machine.succeed(f"{curl} -d '{body}' http://localhost:{centrifugoPort}/api/info"))
72 return resp["result"]["nodes"]
73 machineNames = {m.name for m in machines}
74 for machine in machines:
75 nodes = list_nodes(machine)
76 assert len(nodes) == len(machines)
77 nodeNames = {n['name'] for n in nodes}
78 assert machineNames == nodeNames
79 '';
80}