1(
2 { lib, pkgs, ... }:
3
4 {
5 name = "temporal";
6 meta.maintainers = [ pkgs.lib.maintainers.jpds ];
7
8 nodes = {
9 temporal =
10 { config, pkgs, ... }:
11 {
12 networking.firewall.allowedTCPPorts = [ 7233 ];
13
14 environment.systemPackages = [
15 (pkgs.writers.writePython3Bin "temporal-hello-workflow.py"
16 {
17 libraries = [ pkgs.python3Packages.temporalio ];
18 }
19 # Graciously taken from https://github.com/temporalio/samples-python/blob/main/hello/hello_activity.py
20 ''
21 import asyncio
22 from concurrent.futures import ThreadPoolExecutor
23 from dataclasses import dataclass
24 from datetime import timedelta
25
26 from temporalio import activity, workflow
27 from temporalio.client import Client
28 from temporalio.worker import Worker
29
30
31 # While we could use multiple parameters in the activity, Temporal strongly
32 # encourages using a single dataclass instead which can have fields added to it
33 # in a backwards-compatible way.
34 @dataclass
35 class ComposeGreetingInput:
36 greeting: str
37 name: str
38
39
40 # Basic activity that logs and does string concatenation
41 @activity.defn
42 def compose_greeting(input: ComposeGreetingInput) -> str:
43 activity.logger.info("Running activity with parameter %s" % input)
44 return f"{input.greeting}, {input.name}!"
45
46
47 # Basic workflow that logs and invokes an activity
48 @workflow.defn
49 class GreetingWorkflow:
50 @workflow.run
51 async def run(self, name: str) -> str:
52 workflow.logger.info("Running workflow with parameter %s" % name)
53 return await workflow.execute_activity(
54 compose_greeting,
55 ComposeGreetingInput("Hello", name),
56 start_to_close_timeout=timedelta(seconds=10),
57 )
58
59
60 async def main():
61 # Uncomment the lines below to see logging output
62 # import logging
63 # logging.basicConfig(level=logging.INFO)
64
65 # Start client
66 client = await Client.connect("localhost:7233")
67
68 # Run a worker for the workflow
69 async with Worker(
70 client,
71 task_queue="hello-activity-task-queue",
72 workflows=[GreetingWorkflow],
73 activities=[compose_greeting],
74 # Non-async activities require an executor;
75 # a thread pool executor is recommended.
76 # This same thread pool could be passed to multiple workers if desired.
77 activity_executor=ThreadPoolExecutor(5),
78 ):
79
80 # While the worker is running, use the client to run the workflow and
81 # print out its result. Note, in many production setups, the client
82 # would be in a completely separate process from the worker.
83 result = await client.execute_workflow(
84 GreetingWorkflow.run,
85 "World",
86 id="hello-activity-workflow-id",
87 task_queue="hello-activity-task-queue",
88 )
89 print(f"Result: {result}")
90
91
92 if __name__ == "__main__":
93 asyncio.run(main())
94 ''
95 )
96 pkgs.temporal-cli
97 ];
98
99 services.temporal = {
100 enable = true;
101 settings = {
102 # Based on https://github.com/temporalio/temporal/blob/main/config/development-sqlite.yaml
103 log = {
104 stdout = true;
105 level = "info";
106 };
107 services = {
108 frontend = {
109 rpc = {
110 grpcPort = 7233;
111 membershipPort = 6933;
112 bindOnLocalHost = true;
113 httpPort = 7243;
114 };
115 };
116 matching = {
117 rpc = {
118 grpcPort = 7235;
119 membershipPort = 6935;
120 bindOnLocalHost = true;
121 };
122 };
123 history = {
124 rpc = {
125 grpcPort = 7234;
126 membershipPort = 6934;
127 bindOnLocalHost = true;
128 };
129 };
130 worker = {
131 rpc = {
132 grpcPort = 7239;
133 membershipPort = 6939;
134 bindOnLocalHost = true;
135 };
136 };
137 };
138
139 persistence = {
140 defaultStore = "sqlite-default";
141 visibilityStore = "sqlite-visibility";
142 numHistoryShards = 1;
143 datastores = {
144 sqlite-default = {
145 sql = {
146 user = "";
147 password = "";
148 pluginName = "sqlite";
149 databaseName = "default";
150 connectAddr = "localhost";
151 connectProtocol = "tcp";
152 connectAttributes = {
153 mode = "memory";
154 cache = "private";
155 };
156 maxConns = 1;
157 maxIdleConns = 1;
158 maxConnLifetime = "1h";
159 tls = {
160 enabled = false;
161 caFile = "";
162 certFile = "";
163 keyFile = "";
164 enableHostVerification = false;
165 serverName = "";
166 };
167 };
168 };
169 sqlite-visibility = {
170 sql = {
171 user = "";
172 password = "";
173 pluginName = "sqlite";
174 databaseName = "default";
175 connectAddr = "localhost";
176 connectProtocol = "tcp";
177 connectAttributes = {
178 mode = "memory";
179 cache = "private";
180 };
181 maxConns = 1;
182 maxIdleConns = 1;
183 maxConnLifetime = "1h";
184 tls = {
185 enabled = false;
186 caFile = "";
187 certFile = "";
188 keyFile = "";
189 enableHostVerification = false;
190 serverName = "";
191 };
192 };
193 };
194 };
195 };
196 clusterMetadata = {
197 enableGlobalNamespace = false;
198 failoverVersionIncrement = 10;
199 masterClusterName = "active";
200 currentClusterName = "active";
201 clusterInformation = {
202 active = {
203 enabled = true;
204 initialFailoverVersion = 1;
205 rpcName = "frontend";
206 rpcAddress = "localhost:7233";
207 httpAddress = "localhost:7243";
208 };
209 };
210 };
211
212 dcRedirectionPolicy = {
213 policy = "noop";
214 };
215
216 archival = {
217 history = {
218 state = "enabled";
219 enableRead = true;
220 provider = {
221 filestore = {
222 fileMode = "0666";
223 dirMode = "0766";
224 };
225 gstorage = {
226 credentialsPath = "/tmp/gcloud/keyfile.json";
227 };
228 };
229 };
230 visibility = {
231 state = "enabled";
232 enableRead = true;
233 provider = {
234 filestore = {
235 fileMode = "0666";
236 dirMode = "0766";
237 };
238 };
239 };
240 };
241
242 namespaceDefaults = {
243 archival = {
244 history = {
245 state = "disabled";
246 URI = "file:///tmp/temporal_archival/development";
247 };
248 visibility = {
249 state = "disabled";
250 URI = "file:///tmp/temporal_vis_archival/development";
251 };
252 };
253 };
254 };
255 };
256 };
257 };
258
259 testScript = ''
260 temporal.wait_for_unit("temporal")
261 temporal.wait_for_open_port(6933)
262 temporal.wait_for_open_port(6934)
263 temporal.wait_for_open_port(6935)
264 temporal.wait_for_open_port(7233)
265 temporal.wait_for_open_port(7234)
266 temporal.wait_for_open_port(7235)
267
268 temporal.wait_until_succeeds(
269 "journalctl -o cat -u temporal.service | grep 'server-version' | grep '${pkgs.temporal.version}'"
270 )
271
272 temporal.wait_until_succeeds(
273 "journalctl -o cat -u temporal.service | grep 'Frontend is now healthy'"
274 )
275
276 import json
277 cluster_list_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster list --output json"))
278 assert cluster_list_json[0]['clusterName'] == "active"
279
280 cluster_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster describe --output json"))
281 assert cluster_describe_json['serverVersion'] in "${pkgs.temporal.version}"
282
283 temporal.log(temporal.wait_until_succeeds("temporal operator namespace create --namespace default"))
284
285 temporal.wait_until_succeeds(
286 "journalctl -o cat -u temporal.service | grep 'Register namespace succeeded'"
287 )
288
289 namespace_list_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace list --output json"))
290 assert len(namespace_list_json) == 2
291
292 namespace_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace describe --output json --namespace default"))
293 assert namespace_describe_json['namespaceInfo']['name'] == "default"
294 assert namespace_describe_json['namespaceInfo']['state'] == "NAMESPACE_STATE_REGISTERED"
295
296 workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
297 assert len(workflow_json) == 0
298
299 out = temporal.wait_until_succeeds("temporal-hello-workflow.py")
300 assert "Result: Hello, World!" in out
301
302 workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json"))
303 assert workflow_json[0]['execution']['workflowId'] == "hello-activity-workflow-id"
304 assert workflow_json[0]['status'] == "WORKFLOW_EXECUTION_STATUS_COMPLETED"
305
306 temporal.log(temporal.succeed(
307 "systemd-analyze security temporal.service | grep -v '✓'"
308 ))
309 '';
310 }
311)