#!/usr/bin/env python3 import sys import json import urllib.parse import urllib.request import time import queue import threading import concurrent.futures # mix every python concurrency thing why not assert sys.version_info >= (3, 13), 'need at least python 3.13 for queue shutdown' def get_json(url, attempts_remaining=2): if attempts_remaining == 0: raise Exception("no more retries left") try: with urllib.request.urlopen(url, timeout=5) as f: return json.load(f) except urllib.error.HTTPError as e: if e.code in ( 408, # req timeout 429, # chill 500, # oop 502, # pray for upstream 503, # come back plz 504, # try harder? ): print(f'http {e.code}, retrying in a moment...', file=sys.stderr) time.sleep(4) return get_json(url, attempts_remaining - 1) else: raise except urllib.error.URLError as e: print(f'failed to get {url}: {e}, retrying...', file=sys.stderr) time.sleep(2) return get_json(url, attempts_remaining - 1) def get_repos(state, check_queue, exclude_host_suffixes): def should_exclude(pds): parsed = urllib.parse.urlparse(pds) if parsed.scheme != 'https': print('excluding non-https', pds, file=sys.stderr) return True if any(parsed.hostname.endswith(s) for s in exclude_host_suffixes): print('excluding ignored', pds, file=sys.stderr) return True return False def get_pds_repos(pds): if should_exclude(pds): return try: repos = get_json(f'{pds}xrpc/com.atproto.sync.listRepos?limit=1000') # NOTE: listRepos always returns a cursor even when there is no subsequent page # none of the PDSs we care about have over 1k users so just ignore for now and save a pointless req except Exception as e: print(f'failed: {e}', file=sys.stderr) return for repo in repos['repos']: did = repo['did'] check_queue.put((did, pds)) try: for (did, val) in state['firehose']['didWebs'].items(): if pds := val.get('pds'): if should_exclude(pds): continue check_queue.put((did, pds)) with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor: executor.map(get_pds_repos, state['pdses']) finally: print('get_repos done') check_queue.shutdown() def plc_pds_changed_to(plc, did, pds): log = get_json(f'{plc}/{did}/log/audit') current_pds = None pds_changed = False for step in log: op = step['operation'] if services := op.get('services'): if pds_service := services.get('atproto_pds'): if pds_service.get('type') == 'AtprotoPersonalDataServer': this_pds = pds_service['endpoint'] if current_pds is not None and this_pds != current_pds: pds_changed = True current_pds = this_pds if current_pds is None: return False if not pds_changed: return False # mary's scraping adds a trailing / to the pds if not current_pds.endswith('/'): current_pds = f'{current_pds}/' return pds == current_pds def actually_check(did, host): if not host.endswith('/'): host += '/' # match relays to mary's trailing / style try: status = get_json(f'{host}xrpc/com.atproto.sync.getRepoStatus?did={did}') except urllib.error.HTTPError as e: if e.code == 404: return None # ughh raise return status['active'] total_oks = 0 total_fails = 0 def do_checks(did, pds, host_checks): if not actually_check(did, pds): # pds is not active=true, so we don't care ebout this one return fails = [] for host in host_checks: res = actually_check(did, host) if res is None: pass # too noisy for now # fails.append((host, 'missing')) elif res is False: fails.append((host, 'deactivated')) if len(fails) > 0: global total_fails total_fails += 1 print('{}\tactivated@{}\t{}'.format(did, pds, '\t'.join([f'{s}@{h}' for h, s in fails]))) else: global total_oks total_oks += 1 def check_worker(q, plc, host_checks): try: while True: did, pds = q.get() try: if did.startswith('did:web:'): do_checks(did, pds, host_checks) elif did.startswith('did:plc:'): if plc_pds_changed_to(plc, did, pds): do_checks(did, pds, host_checks) else: print('weird did, ignoring:', did) except Exception as e: print('failed to check', did, pds, e, file=sys.stderr) except queue.ShutDown: return def main(scraping_url, plc, checks, exclude_host_suffixes): # print(plc_pds_changed_to(plc, 'did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/')) # do_checks('did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/', checks) # return # do_checks('did:web:char.bun.how', 'https://pds.bun.how', checks) # return state = get_json(scraping_url) print('got state.') check_queue = queue.Queue(maxsize=64) print('starting get_repos in a thread:') gr = threading.Thread(target=get_repos, args=(state, check_queue, exclude_host_suffixes)) gr.start() print('starting workers for checks...') with concurrent.futures.ThreadPoolExecutor(max_workers=42) as executor: for _ in range(42): executor.submit(check_worker, check_queue, plc, checks) print('done.') print(f'{total_oks=} {total_fails=}') gr.join() if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--plc', default='https://plc.directory', help='plc directory host') parser.add_argument( '--scraping', default='https://raw.githubusercontent.com/mary-ext/atproto-scraping/refs/heads/trunk/state.json', help='scraping state to discover PDSs to crawl', ) parser.add_argument( '--check', default=[ 'https://relay1.us-east.bsky.network', 'https://relay1.us-west.bsky.network', 'https://relay.fire.hose.cam', # @microcosm.blue 'https://relay3.fr.hose.cam', # @microcosm.blue 'https://relay.hayescmd.net', # @edavis.dev # https://bsky.network does not support getRepoStatus # https://atproto.africa does not support getRepoStatus ], action='append', help='which downstream network services (probably relays) to check for deactivated state with /xrpc/com.atproto.sync.getRepoStatus', ) parser.add_argument( '--exclude-host-suffix', default=[ '.host.bsky.network', '.brid.gy', ], action='append', help='PDS instances to skip checking', ) args = parser.parse_args() main(args.scraping, args.plc, args.check, args.exclude_host_suffix)