plc-crawler.py
1# adapted from rsky-relay's crawl script:
2# https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/rsky-relay/crawler.py
3# under its Apache2.0 open-source license: https://github.com/blacksky-algorithms/rsky/blob/6a8e7dab7e424b42e946e40831166176520837d4/LICENSE
4
5import json
6import sqlite3
7import time
8import gzip
9
10import requests
11
12# Configuration
13BASE_URL = "https://plc.directory/export"
14COUNT_PER_REQUEST = 1000
15SLEEP_SECONDS = 0.333
16DB_FILE = "plc_directory.db"
17
18
19def create_database():
20 """Create SQLite database and table if they don't exist."""
21 conn = sqlite3.connect(DB_FILE)
22 conn.execute("""PRAGMA journal_mode = WAL""")
23 cursor = conn.cursor()
24
25 # Create table for PLC operations
26 cursor.execute("""
27 CREATE TABLE IF NOT EXISTS plc_pages (
28 id integer primary key,
29 after text,
30 first_at text not null,
31 last_at text not null,
32 ops_count integer not null,
33 body_bytes integer not null
34 ) strict
35 """)
36
37 # Create indexes
38 cursor.execute("""
39 CREATE INDEX IF NOT EXISTS idx_last_at
40 ON plc_pages (last_at)
41 """)
42
43 conn.commit()
44 return conn
45
46
47def fetch_plc_operations(session, after=None, retries=2):
48 """Fetch PLC operations from the API using a persistent session."""
49 params = {"count": COUNT_PER_REQUEST}
50 if after:
51 params["after"] = after
52
53 response = session.get(BASE_URL, params=params)
54
55 if response.status_code == 200:
56 first_at = None
57 last_at = None,
58 body_bytes = len(response.text)
59 ops = response.text.split("\n")
60 ops_count = len(ops)
61 if ops_count >= 1:
62 first = json.loads(ops[0])
63 first_at = first["createdAt"]
64 if ops_count >= 2:
65 last = json.loads(ops[-1])
66 last_at = last["createdAt"]
67
68 with gzip.open(f'./pages/{after}.jsonl.gz', 'wt') as g:
69 g.write(response.text)
70
71 return (after, first_at, last_at, ops_count, body_bytes)
72 elif response.status_code == 429 and retries > 0:
73 print('429. chillin for a sec.')
74 time.sleep(3)
75 return fetch_plc_operations(session, after=after, retries=retries-1)
76 else:
77 print(f"Error fetching data: {response.status_code} - {response.text}")
78 return (after, None, None, 0, 0)
79
80
81def insert_operations(conn, operations):
82 """Insert stuff into the database."""
83 after, first_at, last_at, ops_count, body_bytes = operations
84
85 # print(f'insert {after=} {first_at=} {last_at=} {ops_count=} {body_bytes=}')
86
87 cursor = conn.cursor()
88 cursor.execute(
89 """
90 INSERT INTO plc_pages (after, first_at, last_at, ops_count, body_bytes)
91 VALUES (?, ?, ?, ?, ?)
92 """,
93 (after, first_at, last_at, ops_count, body_bytes),
94 )
95 conn.commit()
96 return ops_count
97
98
99def get_count(conn):
100 """Get the count from the database."""
101 cursor = conn.cursor()
102 cursor.execute("SELECT sum(ops_count) FROM plc_pages")
103 result = cursor.fetchone()
104 return (result[0] or 0) if result else 0
105
106
107def get_pages_count(conn):
108 cursor = conn.cursor()
109 cursor.execute("SELECT count(*) FROM plc_pages")
110 result = cursor.fetchone()
111 return (result[0] or 0) if result else 0
112
113
114def get_latest_timestamp(conn):
115 """Get the latest timestamp from the database."""
116 cursor = conn.cursor()
117 cursor.execute(
118 "SELECT last_at FROM plc_pages ORDER BY id DESC LIMIT 1"
119 )
120 result = cursor.fetchone()
121 return result[0] if result else None
122
123
124def main():
125 conn = create_database()
126
127 # Create a persistent session
128 session = requests.Session()
129 session.headers.update({
130 "User-Agent": "bad-example-plc-crawl",
131 "Accept": "application/json",
132 })
133
134 # Check if we have existing data and get the latest timestamp
135 latest_timestamp = get_latest_timestamp(conn)
136 last_at = latest_timestamp
137
138 previous_processed = get_count(conn)
139 previous_requests_count = get_pages_count(conn)
140
141 ops_processed = 0
142 request_count = 0
143
144 try:
145 print("Starting PLC Directory API crawl...")
146
147 while True:
148 operations = fetch_plc_operations(session, last_at)
149 request_count += 1
150
151 if not operations:
152 print("No more operations to fetch or API error occurred.")
153 break
154
155 ops_processed += insert_operations(conn, operations)
156
157 # Get the last timestamp for the next request
158 last_at = operations[2]
159
160 # Progress reporting
161 print(
162 f"Request #{request_count}: Fetched {operations[3]}, "
163 f"Total {ops_processed + previous_processed}, Last timestamp: {last_at}"
164 )
165
166 # Check if we got fewer records than requested (end of data)
167 if operations[3] < COUNT_PER_REQUEST:
168 print("Reached the end of available data.")
169 break
170
171 # Sleep to avoid overloading the server
172 time.sleep(SLEEP_SECONDS)
173
174 except KeyboardInterrupt:
175 print("\nCrawl interrupted by user. Progress saved.")
176 except Exception as e:
177 print(f"Error occurred: {e}")
178 finally:
179 # Final stats
180 print("\nCrawl complete or interrupted.")
181 print(f"Total records processed: {previous_processed + ops_processed}")
182 print(f"Total API requests made: {previous_requests_count + request_count}")
183
184 # Close connections
185 conn.close()
186 session.close() # Close the session when done
187
188
189if __name__ == "__main__":
190 main()