at 24.11-pre 3.6 kB view raw
1import ./make-test-python.nix ({ pkgs, lib, ... }: { 2 name = "pg_anonymizer"; 3 meta.maintainers = lib.teams.flyingcircus.members; 4 5 nodes.machine = { pkgs, ... }: { 6 environment.systemPackages = [ pkgs.pg-dump-anon ]; 7 services.postgresql = { 8 enable = true; 9 extraPlugins = ps: [ ps.anonymizer ]; 10 settings.shared_preload_libraries = [ "anon" ]; 11 }; 12 }; 13 14 testScript = '' 15 start_all() 16 machine.wait_for_unit("multi-user.target") 17 machine.wait_for_unit("postgresql.service") 18 19 with subtest("Setup"): 20 machine.succeed("sudo -u postgres psql --command 'create database demo'") 21 machine.succeed( 22 "sudo -u postgres psql -d demo -f ${pkgs.writeText "init.sql" '' 23 create extension anon cascade; 24 select anon.init(); 25 create table player(id serial, name text, points int); 26 insert into player(id,name,points) values (1,'Foo', 23); 27 insert into player(id,name,points) values (2,'Bar',42); 28 security label for anon on column player.name is 'MASKED WITH FUNCTION anon.fake_last_name();'; 29 security label for anon on column player.points is 'MASKED WITH VALUE NULL'; 30 ''}" 31 ) 32 33 def get_player_table_contents(): 34 return [ 35 x.split(',') for x in machine.succeed("sudo -u postgres psql -d demo --csv --command 'select * from player'").splitlines()[1:] 36 ] 37 38 def check_anonymized_row(row, id, original_name): 39 assert row[0] == id, f"Expected first row to have ID {id}, but got {row[0]}" 40 assert row[1] != original_name, f"Expected first row to have a name other than {original_name}" 41 assert not bool(row[2]), "Expected points to be NULL in first row" 42 43 def find_xsv_in_dump(dump, sep=','): 44 """ 45 Expecting to find a CSV (for pg_dump_anon) or TSV (for pg_dump) structure, looking like 46 47 COPY public.player ... 48 1,Shields, 49 2,Salazar, 50 \. 51 52 in the given dump (the commas are tabs in case of pg_dump). 53 Extract the CSV lines and split by `sep`. 54 """ 55 56 try: 57 from itertools import dropwhile, takewhile 58 return [x.split(sep) for x in list(takewhile( 59 lambda x: x != "\\.", 60 dropwhile( 61 lambda x: not x.startswith("COPY public.player"), 62 dump.splitlines() 63 ) 64 ))[1:]] 65 except: 66 print(f"Dump to process: {dump}") 67 raise 68 69 def check_original_data(output): 70 assert output[0] == ['1','Foo','23'], f"Expected first row from player table to be 1,Foo,23; got {output[0]}" 71 assert output[1] == ['2','Bar','42'], f"Expected first row from player table to be 2,Bar,42; got {output[1]}" 72 73 def check_anonymized_rows(output): 74 check_anonymized_row(output[0], '1', 'Foo') 75 check_anonymized_row(output[1], '2', 'Bar') 76 77 with subtest("Check initial state"): 78 check_original_data(get_player_table_contents()) 79 80 with subtest("Anonymous dumps"): 81 check_original_data(find_xsv_in_dump( 82 machine.succeed("sudo -u postgres pg_dump demo"), 83 sep='\t' 84 )) 85 check_anonymized_rows(find_xsv_in_dump( 86 machine.succeed("sudo -u postgres pg_dump_anon -U postgres -h /run/postgresql -d demo"), 87 sep=',' 88 )) 89 90 with subtest("Anonymize"): 91 machine.succeed("sudo -u postgres psql -d demo --command 'select anon.anonymize_database();'") 92 check_anonymized_rows(get_player_table_contents()) 93 ''; 94})