at 23.05-pre 1.1 kB view raw
1import ./make-test-python.nix ({ pkgs, lib, ... }: 2 3let 4 pythonEnv = pkgs.python3.withPackages (p: [p.beanstalkc]); 5 6 produce = pkgs.writeScript "produce.py" '' 7 #!${pythonEnv.interpreter} 8 import beanstalkc 9 10 queue = beanstalkc.Connection(host='localhost', port=11300, parse_yaml=False); 11 queue.put(b'this is a job') 12 queue.put(b'this is another job') 13 ''; 14 15 consume = pkgs.writeScript "consume.py" '' 16 #!${pythonEnv.interpreter} 17 import beanstalkc 18 19 queue = beanstalkc.Connection(host='localhost', port=11300, parse_yaml=False); 20 21 job = queue.reserve(timeout=0) 22 print(job.body.decode('utf-8')) 23 job.delete() 24 ''; 25 26in 27{ 28 name = "beanstalkd"; 29 meta.maintainers = [ lib.maintainers.aanderse ]; 30 31 nodes.machine = 32 { ... }: 33 { services.beanstalkd.enable = true; 34 }; 35 36 testScript = '' 37 start_all() 38 39 machine.wait_for_unit("beanstalkd.service") 40 41 machine.succeed("${produce}") 42 assert "this is a job\n" == machine.succeed( 43 "${consume}" 44 ) 45 assert "this is another job\n" == machine.succeed( 46 "${consume}" 47 ) 48 ''; 49})