check every indie atproto account for relay active state de-sync
crawl-indies.py
231 lines 7.4 kB view raw
1#!/usr/bin/env python3 2import sys 3import json 4import urllib.parse 5import urllib.request 6import time 7import queue 8import threading 9import concurrent.futures # mix every python concurrency thing why not 10 11 12assert sys.version_info >= (3, 13), 'need at least python 3.13 for queue shutdown' 13 14 15def get_json(url, attempts_remaining=2): 16 if attempts_remaining == 0: 17 raise Exception("no more retries left") 18 19 try: 20 with urllib.request.urlopen(url, timeout=5) as f: 21 return json.load(f) 22 23 except urllib.error.HTTPError as e: 24 if e.code in ( 25 408, # req timeout 26 429, # chill 27 500, # oop 28 502, # pray for upstream 29 503, # come back plz 30 504, # try harder? 31 ): 32 print(f'http {e.code}, retrying in a moment...', file=sys.stderr) 33 time.sleep(4) 34 return get_json(url, attempts_remaining - 1) 35 else: 36 raise 37 38 except urllib.error.URLError as e: 39 print(f'failed to get {url}: {e}, retrying...', file=sys.stderr) 40 time.sleep(2) 41 return get_json(url, attempts_remaining - 1) 42 43 44def get_repos(state, check_queue, exclude_host_suffixes): 45 def should_exclude(pds): 46 parsed = urllib.parse.urlparse(pds) 47 if parsed.scheme != 'https': 48 print('excluding non-https', pds, file=sys.stderr) 49 return True 50 if any(parsed.hostname.endswith(s) for s in exclude_host_suffixes): 51 print('excluding ignored', pds, file=sys.stderr) 52 return True 53 return False 54 55 def get_pds_repos(pds): 56 if should_exclude(pds): 57 return 58 try: 59 repos = get_json(f'{pds}xrpc/com.atproto.sync.listRepos?limit=1000') 60 # NOTE: listRepos always returns a cursor even when there is no subsequent page 61 # none of the PDSs we care about have over 1k users so just ignore for now and save a pointless req 62 except Exception as e: 63 print(f'failed: {e}', file=sys.stderr) 64 return 65 for repo in repos['repos']: 66 did = repo['did'] 67 check_queue.put((did, pds)) 68 69 try: 70 for (did, val) in state['firehose']['didWebs'].items(): 71 if pds := val.get('pds'): 72 if should_exclude(pds): 73 continue 74 check_queue.put((did, pds)) 75 76 with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor: 77 executor.map(get_pds_repos, state['pdses']) 78 79 finally: 80 print('get_repos done') 81 check_queue.shutdown() 82 83 84def plc_pds_changed_to(plc, did, pds): 85 log = get_json(f'{plc}/{did}/log/audit') 86 current_pds = None 87 pds_changed = False 88 for step in log: 89 op = step['operation'] 90 if services := op.get('services'): 91 if pds_service := services.get('atproto_pds'): 92 if pds_service.get('type') == 'AtprotoPersonalDataServer': 93 this_pds = pds_service['endpoint'] 94 if current_pds is not None and this_pds != current_pds: 95 pds_changed = True 96 current_pds = this_pds 97 98 if current_pds is None: 99 return False 100 101 if not pds_changed: 102 return False 103 104 # mary's scraping adds a trailing / to the pds 105 if not current_pds.endswith('/'): 106 current_pds = f'{current_pds}/' 107 108 return pds == current_pds 109 110 111def actually_check(did, host): 112 if not host.endswith('/'): 113 host += '/' # match relays to mary's trailing / style 114 try: 115 status = get_json(f'{host}xrpc/com.atproto.sync.getRepoStatus?did={did}') 116 except urllib.error.HTTPError as e: 117 if e.code == 404: 118 return None # ughh 119 raise 120 return status['active'] 121 122 123total_oks = 0 124total_fails = 0 125 126def do_checks(did, pds, host_checks): 127 if not actually_check(did, pds): 128 # pds is not active=true, so we don't care ebout this one 129 return 130 131 fails = [] 132 for host in host_checks: 133 res = actually_check(did, host) 134 if res is None: 135 pass # too noisy for now 136 # fails.append((host, 'missing')) 137 elif res is False: 138 fails.append((host, 'deactivated')) 139 140 if len(fails) > 0: 141 global total_fails 142 total_fails += 1 143 print('{}\tactivated@{}\t{}'.format(did, pds, '\t'.join([f'{s}@{h}' for h, s in fails]))) 144 else: 145 global total_oks 146 total_oks += 1 147 148 149def check_worker(q, plc, host_checks): 150 try: 151 while True: 152 did, pds = q.get() 153 try: 154 if did.startswith('did:web:'): 155 do_checks(did, pds, host_checks) 156 elif did.startswith('did:plc:'): 157 if plc_pds_changed_to(plc, did, pds): 158 do_checks(did, pds, host_checks) 159 else: 160 print('weird did, ignoring:', did) 161 except Exception as e: 162 print('failed to check', did, pds, e, file=sys.stderr) 163 except queue.ShutDown: 164 return 165 166 167def main(scraping_url, plc, checks, exclude_host_suffixes): 168 169 # print(plc_pds_changed_to(plc, 'did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/')) 170 # do_checks('did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/', checks) 171 # return 172 173 # do_checks('did:web:char.bun.how', 'https://pds.bun.how', checks) 174 # return 175 176 state = get_json(scraping_url) 177 print('got state.') 178 179 check_queue = queue.Queue(maxsize=64) 180 181 print('starting get_repos in a thread:') 182 gr = threading.Thread(target=get_repos, args=(state, check_queue, exclude_host_suffixes)) 183 gr.start() 184 185 print('starting workers for checks...') 186 with concurrent.futures.ThreadPoolExecutor(max_workers=42) as executor: 187 for _ in range(42): 188 executor.submit(check_worker, check_queue, plc, checks) 189 190 print('done.') 191 print(f'{total_oks=} {total_fails=}') 192 193 gr.join() 194 195 196 197 198if __name__ == '__main__': 199 import argparse 200 parser = argparse.ArgumentParser() 201 parser.add_argument('--plc', default='https://plc.directory', help='plc directory host') 202 parser.add_argument( 203 '--scraping', 204 default='https://raw.githubusercontent.com/mary-ext/atproto-scraping/refs/heads/trunk/state.json', 205 help='scraping state to discover PDSs to crawl', 206 ) 207 parser.add_argument( 208 '--check', 209 default=[ 210 'https://relay1.us-east.bsky.network', 211 'https://relay1.us-west.bsky.network', 212 'https://relay.fire.hose.cam', # @microcosm.blue 213 'https://relay3.fr.hose.cam', # @microcosm.blue 214 'https://relay.hayescmd.net', # @edavis.dev 215 # https://bsky.network does not support getRepoStatus 216 # https://atproto.africa does not support getRepoStatus 217 ], 218 action='append', 219 help='which downstream network services (probably relays) to check for deactivated state with /xrpc/com.atproto.sync.getRepoStatus', 220 ) 221 parser.add_argument( 222 '--exclude-host-suffix', 223 default=[ 224 '.host.bsky.network', 225 '.brid.gy', 226 ], 227 action='append', 228 help='PDS instances to skip checking', 229 ) 230 args = parser.parse_args() 231 main(args.scraping, args.plc, args.check, args.exclude_host_suffix)