# adapted from rsky-relay's crawl script: # https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/rsky-relay/crawler.py # under its Apache2.0 open-source license: https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/LICENSE import json import sqlite3 import time import gzip import requests # Configuration BASE_URL = "https://plc.directory/export" COUNT_PER_REQUEST = 1000 SLEEP_SECONDS = 0.333 DB_FILE = "plc_directory.db" def create_database(): """Create SQLite database and table if they don't exist.""" conn = sqlite3.connect(DB_FILE) conn.execute("""PRAGMA journal_mode = WAL""") cursor = conn.cursor() # Create table for PLC operations cursor.execute(""" CREATE TABLE IF NOT EXISTS plc_pages ( id integer primary key, after text, first_at text not null, last_at text not null, ops_count integer not null, body_bytes integer not null ) strict """) # Create indexes cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_last_at ON plc_pages (last_at) """) conn.commit() return conn def fetch_plc_operations(session, after=None, retries=2): """Fetch PLC operations from the API using a persistent session.""" params = {"count": COUNT_PER_REQUEST} if after: params["after"] = after response = session.get(BASE_URL, params=params) if response.status_code == 200: first_at = None last_at = None, body_bytes = len(response.text) ops = response.text.split("\n") ops_count = len(ops) if ops_count >= 1: first = json.loads(ops[0]) first_at = first["createdAt"] if ops_count >= 2: last = json.loads(ops[-1]) last_at = last["createdAt"] with gzip.open(f'./pages/{after}.jsonl.gz', 'wt') as g: g.write(response.text) return (after, first_at, last_at, ops_count, body_bytes) elif response.status_code == 429 and retries > 0: print('429. chillin for a sec.') time.sleep(3) return fetch_plc_operations(session, after=after, retries=retries-1) else: print(f"Error fetching data: {response.status_code} - {response.text}") return (after, None, None, 0, 0) def insert_operations(conn, operations): """Insert stuff into the database.""" after, first_at, last_at, ops_count, body_bytes = operations # print(f'insert {after=} {first_at=} {last_at=} {ops_count=} {body_bytes=}') cursor = conn.cursor() cursor.execute( """ INSERT INTO plc_pages (after, first_at, last_at, ops_count, body_bytes) VALUES (?, ?, ?, ?, ?) """, (after, first_at, last_at, ops_count, body_bytes), ) conn.commit() return ops_count def get_count(conn): """Get the count from the database.""" cursor = conn.cursor() cursor.execute("SELECT sum(ops_count) FROM plc_pages") result = cursor.fetchone() return (result[0] or 0) if result else 0 def get_pages_count(conn): cursor = conn.cursor() cursor.execute("SELECT count(*) FROM plc_pages") result = cursor.fetchone() return (result[0] or 0) if result else 0 def get_latest_timestamp(conn): """Get the latest timestamp from the database.""" cursor = conn.cursor() cursor.execute( "SELECT last_at FROM plc_pages ORDER BY id DESC LIMIT 1" ) result = cursor.fetchone() return result[0] if result else None def main(): conn = create_database() # Create a persistent session session = requests.Session() session.headers.update({ "User-Agent": "bad-example-plc-crawl", "Accept": "application/json", }) # Check if we have existing data and get the latest timestamp latest_timestamp = get_latest_timestamp(conn) last_at = latest_timestamp previous_processed = get_count(conn) previous_requests_count = get_pages_count(conn) ops_processed = 0 request_count = 0 try: print("Starting PLC Directory API crawl...") while True: operations = fetch_plc_operations(session, last_at) request_count += 1 if not operations: print("No more operations to fetch or API error occurred.") break ops_processed += insert_operations(conn, operations) # Get the last timestamp for the next request last_at = operations[2] # Progress reporting print( f"Request #{request_count}: Fetched {operations[3]}, " f"Total {ops_processed + previous_processed}, Last timestamp: {last_at}" ) # Check if we got fewer records than requested (end of data) if operations[3] < COUNT_PER_REQUEST: print("Reached the end of available data.") break # Sleep to avoid overloading the server time.sleep(SLEEP_SECONDS) except KeyboardInterrupt: print("\nCrawl interrupted by user. Progress saved.") except Exception as e: print(f"Error occurred: {e}") finally: # Final stats print("\nCrawl complete or interrupted.") print(f"Total records processed: {previous_processed + ops_processed}") print(f"Total API requests made: {previous_requests_count + request_count}") # Close connections conn.close() session.close() # Close the session when done if __name__ == "__main__": main()