crawl-indies.py
1#!/usr/bin/env python3
2import sys
3import json
4import urllib.parse
5import urllib.request
6import time
7import queue
8import threading
9import concurrent.futures # mix every python concurrency thing why not
10
11
12assert sys.version_info >= (3, 13), 'need at least python 3.13 for queue shutdown'
13
14
15def get_json(url, attempts_remaining=2):
16 if attempts_remaining == 0:
17 raise Exception("no more retries left")
18
19 try:
20 with urllib.request.urlopen(url, timeout=5) as f:
21 return json.load(f)
22
23 except urllib.error.HTTPError as e:
24 if e.code in (
25 408, # req timeout
26 429, # chill
27 500, # oop
28 502, # pray for upstream
29 503, # come back plz
30 504, # try harder?
31 ):
32 print(f'http {e.code}, retrying in a moment...', file=sys.stderr)
33 time.sleep(4)
34 return get_json(url, attempts_remaining - 1)
35 else:
36 raise
37
38 except urllib.error.URLError as e:
39 print(f'failed to get {url}: {e}, retrying...', file=sys.stderr)
40 time.sleep(2)
41 return get_json(url, attempts_remaining - 1)
42
43
44def get_repos(state, check_queue, exclude_host_suffixes):
45 def should_exclude(pds):
46 parsed = urllib.parse.urlparse(pds)
47 if parsed.scheme != 'https':
48 print('excluding non-https', pds, file=sys.stderr)
49 return True
50 if any(parsed.hostname.endswith(s) for s in exclude_host_suffixes):
51 print('excluding ignored', pds, file=sys.stderr)
52 return True
53 return False
54
55 def get_pds_repos(pds):
56 if should_exclude(pds):
57 return
58 try:
59 repos = get_json(f'{pds}xrpc/com.atproto.sync.listRepos?limit=1000')
60 # NOTE: listRepos always returns a cursor even when there is no subsequent page
61 # none of the PDSs we care about have over 1k users so just ignore for now and save a pointless req
62 except Exception as e:
63 print(f'failed: {e}', file=sys.stderr)
64 return
65 for repo in repos['repos']:
66 did = repo['did']
67 check_queue.put((did, pds))
68
69 try:
70 for (did, val) in state['firehose']['didWebs'].items():
71 if pds := val.get('pds'):
72 if should_exclude(pds):
73 continue
74 check_queue.put((did, pds))
75
76 with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor:
77 executor.map(get_pds_repos, state['pdses'])
78
79 finally:
80 print('get_repos done')
81 check_queue.shutdown()
82
83
84def plc_pds_changed_to(plc, did, pds):
85 log = get_json(f'{plc}/{did}/log/audit')
86 current_pds = None
87 pds_changed = False
88 for step in log:
89 op = step['operation']
90 if services := op.get('services'):
91 if pds_service := services.get('atproto_pds'):
92 if pds_service.get('type') == 'AtprotoPersonalDataServer':
93 this_pds = pds_service['endpoint']
94 if current_pds is not None and this_pds != current_pds:
95 pds_changed = True
96 current_pds = this_pds
97
98 if current_pds is None:
99 return False
100
101 if not pds_changed:
102 return False
103
104 # mary's scraping adds a trailing / to the pds
105 if not current_pds.endswith('/'):
106 current_pds = f'{current_pds}/'
107
108 return pds == current_pds
109
110
111def actually_check(did, host):
112 if not host.endswith('/'):
113 host += '/' # match relays to mary's trailing / style
114 try:
115 status = get_json(f'{host}xrpc/com.atproto.sync.getRepoStatus?did={did}')
116 except urllib.error.HTTPError as e:
117 if e.code == 404:
118 return None # ughh
119 raise
120 return status['active']
121
122
123total_oks = 0
124total_fails = 0
125
126def do_checks(did, pds, host_checks):
127 if not actually_check(did, pds):
128 # pds is not active=true, so we don't care ebout this one
129 return
130
131 fails = []
132 for host in host_checks:
133 res = actually_check(did, host)
134 if res is None:
135 pass # too noisy for now
136 # fails.append((host, 'missing'))
137 elif res is False:
138 fails.append((host, 'deactivated'))
139
140 if len(fails) > 0:
141 global total_fails
142 total_fails += 1
143 print('{}\tactivated@{}\t{}'.format(did, pds, '\t'.join([f'{s}@{h}' for h, s in fails])))
144 else:
145 global total_oks
146 total_oks += 1
147
148
149def check_worker(q, plc, host_checks):
150 try:
151 while True:
152 did, pds = q.get()
153 try:
154 if did.startswith('did:web:'):
155 do_checks(did, pds, host_checks)
156 elif did.startswith('did:plc:'):
157 if plc_pds_changed_to(plc, did, pds):
158 do_checks(did, pds, host_checks)
159 else:
160 print('weird did, ignoring:', did)
161 except Exception as e:
162 print('failed to check', did, pds, e, file=sys.stderr)
163 except queue.ShutDown:
164 return
165
166
167def main(scraping_url, plc, checks, exclude_host_suffixes):
168
169 # print(plc_pds_changed_to(plc, 'did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/'))
170 # do_checks('did:plc:k23ujfuppr3hr4pxvtaz7jro', 'https://arcnode.xyz/', checks)
171 # return
172
173 # do_checks('did:web:char.bun.how', 'https://pds.bun.how', checks)
174 # return
175
176 state = get_json(scraping_url)
177 print('got state.')
178
179 check_queue = queue.Queue(maxsize=64)
180
181 print('starting get_repos in a thread:')
182 gr = threading.Thread(target=get_repos, args=(state, check_queue, exclude_host_suffixes))
183 gr.start()
184
185 print('starting workers for checks...')
186 with concurrent.futures.ThreadPoolExecutor(max_workers=42) as executor:
187 for _ in range(42):
188 executor.submit(check_worker, check_queue, plc, checks)
189
190 print('done.')
191 print(f'{total_oks=} {total_fails=}')
192
193 gr.join()
194
195
196
197
198if __name__ == '__main__':
199 import argparse
200 parser = argparse.ArgumentParser()
201 parser.add_argument('--plc', default='https://plc.directory', help='plc directory host')
202 parser.add_argument(
203 '--scraping',
204 default='https://raw.githubusercontent.com/mary-ext/atproto-scraping/refs/heads/trunk/state.json',
205 help='scraping state to discover PDSs to crawl',
206 )
207 parser.add_argument(
208 '--check',
209 default=[
210 'https://relay1.us-east.bsky.network',
211 'https://relay1.us-west.bsky.network',
212 'https://relay.fire.hose.cam', # @microcosm.blue
213 'https://relay3.fr.hose.cam', # @microcosm.blue
214 'https://relay.hayescmd.net', # @edavis.dev
215 # https://bsky.network does not support getRepoStatus
216 # https://atproto.africa does not support getRepoStatus
217 ],
218 action='append',
219 help='which downstream network services (probably relays) to check for deactivated state with /xrpc/com.atproto.sync.getRepoStatus',
220 )
221 parser.add_argument(
222 '--exclude-host-suffix',
223 default=[
224 '.host.bsky.network',
225 '.brid.gy',
226 ],
227 action='append',
228 help='PDS instances to skip checking',
229 )
230 args = parser.parse_args()
231 main(args.scraping, args.plc, args.check, args.exclude_host_suffix)