at master 12 kB view raw
1( 2 { lib, pkgs, ... }: 3 4 { 5 name = "temporal"; 6 meta.maintainers = [ pkgs.lib.maintainers.jpds ]; 7 8 nodes = { 9 temporal = 10 { config, pkgs, ... }: 11 { 12 networking.firewall.allowedTCPPorts = [ 7233 ]; 13 14 environment.systemPackages = [ 15 (pkgs.writers.writePython3Bin "temporal-hello-workflow.py" 16 { 17 libraries = [ pkgs.python3Packages.temporalio ]; 18 } 19 # Graciously taken from https://github.com/temporalio/samples-python/blob/main/hello/hello_activity.py 20 '' 21 import asyncio 22 from concurrent.futures import ThreadPoolExecutor 23 from dataclasses import dataclass 24 from datetime import timedelta 25 26 from temporalio import activity, workflow 27 from temporalio.client import Client 28 from temporalio.worker import Worker 29 30 31 # While we could use multiple parameters in the activity, Temporal strongly 32 # encourages using a single dataclass instead which can have fields added to it 33 # in a backwards-compatible way. 34 @dataclass 35 class ComposeGreetingInput: 36 greeting: str 37 name: str 38 39 40 # Basic activity that logs and does string concatenation 41 @activity.defn 42 def compose_greeting(input: ComposeGreetingInput) -> str: 43 activity.logger.info("Running activity with parameter %s" % input) 44 return f"{input.greeting}, {input.name}!" 45 46 47 # Basic workflow that logs and invokes an activity 48 @workflow.defn 49 class GreetingWorkflow: 50 @workflow.run 51 async def run(self, name: str) -> str: 52 workflow.logger.info("Running workflow with parameter %s" % name) 53 return await workflow.execute_activity( 54 compose_greeting, 55 ComposeGreetingInput("Hello", name), 56 start_to_close_timeout=timedelta(seconds=10), 57 ) 58 59 60 async def main(): 61 # Uncomment the lines below to see logging output 62 # import logging 63 # logging.basicConfig(level=logging.INFO) 64 65 # Start client 66 client = await Client.connect("localhost:7233") 67 68 # Run a worker for the workflow 69 async with Worker( 70 client, 71 task_queue="hello-activity-task-queue", 72 workflows=[GreetingWorkflow], 73 activities=[compose_greeting], 74 # Non-async activities require an executor; 75 # a thread pool executor is recommended. 76 # This same thread pool could be passed to multiple workers if desired. 77 activity_executor=ThreadPoolExecutor(5), 78 ): 79 80 # While the worker is running, use the client to run the workflow and 81 # print out its result. Note, in many production setups, the client 82 # would be in a completely separate process from the worker. 83 result = await client.execute_workflow( 84 GreetingWorkflow.run, 85 "World", 86 id="hello-activity-workflow-id", 87 task_queue="hello-activity-task-queue", 88 ) 89 print(f"Result: {result}") 90 91 92 if __name__ == "__main__": 93 asyncio.run(main()) 94 '' 95 ) 96 pkgs.temporal-cli 97 ]; 98 99 services.temporal = { 100 enable = true; 101 settings = { 102 # Based on https://github.com/temporalio/temporal/blob/main/config/development-sqlite.yaml 103 log = { 104 stdout = true; 105 level = "info"; 106 }; 107 services = { 108 frontend = { 109 rpc = { 110 grpcPort = 7233; 111 membershipPort = 6933; 112 bindOnLocalHost = true; 113 httpPort = 7243; 114 }; 115 }; 116 matching = { 117 rpc = { 118 grpcPort = 7235; 119 membershipPort = 6935; 120 bindOnLocalHost = true; 121 }; 122 }; 123 history = { 124 rpc = { 125 grpcPort = 7234; 126 membershipPort = 6934; 127 bindOnLocalHost = true; 128 }; 129 }; 130 worker = { 131 rpc = { 132 grpcPort = 7239; 133 membershipPort = 6939; 134 bindOnLocalHost = true; 135 }; 136 }; 137 }; 138 139 persistence = { 140 defaultStore = "sqlite-default"; 141 visibilityStore = "sqlite-visibility"; 142 numHistoryShards = 1; 143 datastores = { 144 sqlite-default = { 145 sql = { 146 user = ""; 147 password = ""; 148 pluginName = "sqlite"; 149 databaseName = "default"; 150 connectAddr = "localhost"; 151 connectProtocol = "tcp"; 152 connectAttributes = { 153 mode = "memory"; 154 cache = "private"; 155 }; 156 maxConns = 1; 157 maxIdleConns = 1; 158 maxConnLifetime = "1h"; 159 tls = { 160 enabled = false; 161 caFile = ""; 162 certFile = ""; 163 keyFile = ""; 164 enableHostVerification = false; 165 serverName = ""; 166 }; 167 }; 168 }; 169 sqlite-visibility = { 170 sql = { 171 user = ""; 172 password = ""; 173 pluginName = "sqlite"; 174 databaseName = "default"; 175 connectAddr = "localhost"; 176 connectProtocol = "tcp"; 177 connectAttributes = { 178 mode = "memory"; 179 cache = "private"; 180 }; 181 maxConns = 1; 182 maxIdleConns = 1; 183 maxConnLifetime = "1h"; 184 tls = { 185 enabled = false; 186 caFile = ""; 187 certFile = ""; 188 keyFile = ""; 189 enableHostVerification = false; 190 serverName = ""; 191 }; 192 }; 193 }; 194 }; 195 }; 196 clusterMetadata = { 197 enableGlobalNamespace = false; 198 failoverVersionIncrement = 10; 199 masterClusterName = "active"; 200 currentClusterName = "active"; 201 clusterInformation = { 202 active = { 203 enabled = true; 204 initialFailoverVersion = 1; 205 rpcName = "frontend"; 206 rpcAddress = "localhost:7233"; 207 httpAddress = "localhost:7243"; 208 }; 209 }; 210 }; 211 212 dcRedirectionPolicy = { 213 policy = "noop"; 214 }; 215 216 archival = { 217 history = { 218 state = "enabled"; 219 enableRead = true; 220 provider = { 221 filestore = { 222 fileMode = "0666"; 223 dirMode = "0766"; 224 }; 225 gstorage = { 226 credentialsPath = "/tmp/gcloud/keyfile.json"; 227 }; 228 }; 229 }; 230 visibility = { 231 state = "enabled"; 232 enableRead = true; 233 provider = { 234 filestore = { 235 fileMode = "0666"; 236 dirMode = "0766"; 237 }; 238 }; 239 }; 240 }; 241 242 namespaceDefaults = { 243 archival = { 244 history = { 245 state = "disabled"; 246 URI = "file:///tmp/temporal_archival/development"; 247 }; 248 visibility = { 249 state = "disabled"; 250 URI = "file:///tmp/temporal_vis_archival/development"; 251 }; 252 }; 253 }; 254 }; 255 }; 256 }; 257 }; 258 259 testScript = '' 260 temporal.wait_for_unit("temporal") 261 temporal.wait_for_open_port(6933) 262 temporal.wait_for_open_port(6934) 263 temporal.wait_for_open_port(6935) 264 temporal.wait_for_open_port(7233) 265 temporal.wait_for_open_port(7234) 266 temporal.wait_for_open_port(7235) 267 268 temporal.wait_until_succeeds( 269 "journalctl -o cat -u temporal.service | grep 'server-version' | grep '${pkgs.temporal.version}'" 270 ) 271 272 temporal.wait_until_succeeds( 273 "journalctl -o cat -u temporal.service | grep 'Frontend is now healthy'" 274 ) 275 276 import json 277 cluster_list_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster list --output json")) 278 assert cluster_list_json[0]['clusterName'] == "active" 279 280 cluster_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator cluster describe --output json")) 281 assert cluster_describe_json['serverVersion'] in "${pkgs.temporal.version}" 282 283 temporal.log(temporal.wait_until_succeeds("temporal operator namespace create --namespace default")) 284 285 temporal.wait_until_succeeds( 286 "journalctl -o cat -u temporal.service | grep 'Register namespace succeeded'" 287 ) 288 289 namespace_list_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace list --output json")) 290 assert len(namespace_list_json) == 2 291 292 namespace_describe_json = json.loads(temporal.wait_until_succeeds("temporal operator namespace describe --output json --namespace default")) 293 assert namespace_describe_json['namespaceInfo']['name'] == "default" 294 assert namespace_describe_json['namespaceInfo']['state'] == "NAMESPACE_STATE_REGISTERED" 295 296 workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json")) 297 assert len(workflow_json) == 0 298 299 out = temporal.wait_until_succeeds("temporal-hello-workflow.py") 300 assert "Result: Hello, World!" in out 301 302 workflow_json = json.loads(temporal.wait_until_succeeds("temporal workflow list --output json")) 303 assert workflow_json[0]['execution']['workflowId'] == "hello-activity-workflow-id" 304 assert workflow_json[0]['status'] == "WORKFLOW_EXECUTION_STATUS_COMPLETED" 305 306 temporal.log(temporal.succeed( 307 "systemd-analyze security temporal.service | grep -v ''" 308 )) 309 ''; 310 } 311)