download an entire plc directory into gzipped json lines
plc-crawler.py
190 lines 5.6 kB view raw
1# adapted from rsky-relay's crawl script: 2# https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/rsky-relay/crawler.py 3# under its Apache2.0 open-source license: https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/LICENSE 4 5import json 6import sqlite3 7import time 8import gzip 9 10import requests 11 12# Configuration 13BASE_URL = "https://plc.directory/export" 14COUNT_PER_REQUEST = 1000 15SLEEP_SECONDS = 0.333 16DB_FILE = "plc_directory.db" 17 18 19def create_database(): 20 """Create SQLite database and table if they don't exist.""" 21 conn = sqlite3.connect(DB_FILE) 22 conn.execute("""PRAGMA journal_mode = WAL""") 23 cursor = conn.cursor() 24 25 # Create table for PLC operations 26 cursor.execute(""" 27 CREATE TABLE IF NOT EXISTS plc_pages ( 28 id integer primary key, 29 after text, 30 first_at text not null, 31 last_at text not null, 32 ops_count integer not null, 33 body_bytes integer not null 34 ) strict 35 """) 36 37 # Create indexes 38 cursor.execute(""" 39 CREATE INDEX IF NOT EXISTS idx_last_at 40 ON plc_pages (last_at) 41 """) 42 43 conn.commit() 44 return conn 45 46 47def fetch_plc_operations(session, after=None, retries=2): 48 """Fetch PLC operations from the API using a persistent session.""" 49 params = {"count": COUNT_PER_REQUEST} 50 if after: 51 params["after"] = after 52 53 response = session.get(BASE_URL, params=params) 54 55 if response.status_code == 200: 56 first_at = None 57 last_at = None, 58 body_bytes = len(response.text) 59 ops = response.text.split("\n") 60 ops_count = len(ops) 61 if ops_count >= 1: 62 first = json.loads(ops[0]) 63 first_at = first["createdAt"] 64 if ops_count >= 2: 65 last = json.loads(ops[-1]) 66 last_at = last["createdAt"] 67 68 with gzip.open(f'./pages/{after}.jsonl.gz', 'wt') as g: 69 g.write(response.text) 70 71 return (after, first_at, last_at, ops_count, body_bytes) 72 elif response.status_code == 429 and retries > 0: 73 print('429. chillin for a sec.') 74 time.sleep(3) 75 return fetch_plc_operations(session, after=after, retries=retries-1) 76 else: 77 print(f"Error fetching data: {response.status_code} - {response.text}") 78 return (after, None, None, 0, 0) 79 80 81def insert_operations(conn, operations): 82 """Insert stuff into the database.""" 83 after, first_at, last_at, ops_count, body_bytes = operations 84 85 # print(f'insert {after=} {first_at=} {last_at=} {ops_count=} {body_bytes=}') 86 87 cursor = conn.cursor() 88 cursor.execute( 89 """ 90 INSERT INTO plc_pages (after, first_at, last_at, ops_count, body_bytes) 91 VALUES (?, ?, ?, ?, ?) 92 """, 93 (after, first_at, last_at, ops_count, body_bytes), 94 ) 95 conn.commit() 96 return ops_count 97 98 99def get_count(conn): 100 """Get the count from the database.""" 101 cursor = conn.cursor() 102 cursor.execute("SELECT sum(ops_count) FROM plc_pages") 103 result = cursor.fetchone() 104 return (result[0] or 0) if result else 0 105 106 107def get_pages_count(conn): 108 cursor = conn.cursor() 109 cursor.execute("SELECT count(*) FROM plc_pages") 110 result = cursor.fetchone() 111 return (result[0] or 0) if result else 0 112 113 114def get_latest_timestamp(conn): 115 """Get the latest timestamp from the database.""" 116 cursor = conn.cursor() 117 cursor.execute( 118 "SELECT last_at FROM plc_pages ORDER BY id DESC LIMIT 1" 119 ) 120 result = cursor.fetchone() 121 return result[0] if result else None 122 123 124def main(): 125 conn = create_database() 126 127 # Create a persistent session 128 session = requests.Session() 129 session.headers.update({ 130 "User-Agent": "bad-example-plc-crawl", 131 "Accept": "application/json", 132 }) 133 134 # Check if we have existing data and get the latest timestamp 135 latest_timestamp = get_latest_timestamp(conn) 136 last_at = latest_timestamp 137 138 previous_processed = get_count(conn) 139 previous_requests_count = get_pages_count(conn) 140 141 ops_processed = 0 142 request_count = 0 143 144 try: 145 print("Starting PLC Directory API crawl...") 146 147 while True: 148 operations = fetch_plc_operations(session, last_at) 149 request_count += 1 150 151 if not operations: 152 print("No more operations to fetch or API error occurred.") 153 break 154 155 ops_processed += insert_operations(conn, operations) 156 157 # Get the last timestamp for the next request 158 last_at = operations[2] 159 160 # Progress reporting 161 print( 162 f"Request #{request_count}: Fetched {operations[3]}, " 163 f"Total {ops_processed + previous_processed}, Last timestamp: {last_at}" 164 ) 165 166 # Check if we got fewer records than requested (end of data) 167 if operations[3] < COUNT_PER_REQUEST: 168 print("Reached the end of available data.") 169 break 170 171 # Sleep to avoid overloading the server 172 time.sleep(SLEEP_SECONDS) 173 174 except KeyboardInterrupt: 175 print("\nCrawl interrupted by user. Progress saved.") 176 except Exception as e: 177 print(f"Error occurred: {e}") 178 finally: 179 # Final stats 180 print("\nCrawl complete or interrupted.") 181 print(f"Total records processed: {previous_processed + ops_processed}") 182 print(f"Total API requests made: {previous_requests_count + request_count}") 183 184 # Close connections 185 conn.close() 186 session.close() # Close the session when done 187 188 189if __name__ == "__main__": 190 main()