social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 7.5 kB view raw
1import json 2import queue 3import sqlite3 4import threading 5from concurrent.futures import Future 6 7 8class DataBaseWorker: 9 def __init__(self, database: str) -> None: 10 super(DataBaseWorker, self).__init__() 11 self.database = database 12 self.queue = queue.Queue() 13 self.thread = threading.Thread(target=self._run, daemon=True) 14 self.shutdown_event = threading.Event() 15 self.conn = sqlite3.connect(self.database, check_same_thread=False) 16 self.lock = threading.Lock() 17 self.thread.start() 18 19 def _run(self): 20 while not self.shutdown_event.is_set(): 21 try: 22 task, future = self.queue.get(timeout=1) 23 try: 24 with self.lock: 25 result = task(self.conn) 26 future.set_result(result) 27 except Exception as e: 28 future.set_exception(e) 29 finally: 30 self.queue.task_done() 31 except queue.Empty: 32 continue 33 34 def execute(self, sql: str, params=()): 35 def task(conn: sqlite3.Connection): 36 cursor = conn.execute(sql, params) 37 conn.commit() 38 return cursor.fetchall() 39 40 future = Future() 41 self.queue.put((task, future)) 42 return future.result() 43 44 def close(self): 45 self.shutdown_event.set() 46 self.thread.join() 47 with self.lock: 48 self.conn.close() 49 50 51def try_insert_repost( 52 db: DataBaseWorker, 53 post_id: str, 54 reposted_id: str, 55 input_user: str, 56 input_service: str, 57) -> bool: 58 reposted = find_post(db, reposted_id, input_user, input_service) 59 if not reposted: 60 return False 61 62 insert_repost(db, post_id, reposted["id"], input_user, input_service) 63 return True 64 65 66def try_insert_post( 67 db: DataBaseWorker, 68 post_id: str, 69 in_reply: str | None, 70 input_user: str, 71 input_service: str, 72) -> bool: 73 root_id = None 74 parent_id = None 75 76 if in_reply: 77 parent_post = find_post(db, in_reply, input_user, input_service) 78 if not parent_post: 79 return False 80 81 root_id = parent_post["id"] 82 parent_id = root_id 83 if parent_post["root_id"]: 84 root_id = parent_post["root_id"] 85 86 if root_id and parent_id: 87 insert_reply(db, post_id, input_user, input_service, parent_id, root_id) 88 else: 89 insert_post(db, post_id, input_user, input_service) 90 91 return True 92 93 94def insert_repost( 95 db: DataBaseWorker, identifier: str, reposted_id: int, user_id: str, serivce: str 96) -> int: 97 db.execute( 98 """ 99 INSERT INTO posts (user_id, service, identifier, reposted_id) 100 VALUES (?, ?, ?, ?); 101 """, 102 (user_id, serivce, identifier, reposted_id), 103 ) 104 return db.execute("SELECT last_insert_rowid();", ())[0][0] 105 106 107def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int: 108 db.execute( 109 """ 110 INSERT INTO posts (user_id, service, identifier) 111 VALUES (?, ?, ?); 112 """, 113 (user_id, serivce, identifier), 114 ) 115 return db.execute("SELECT last_insert_rowid();", ())[0][0] 116 117 118def insert_reply( 119 db: DataBaseWorker, 120 identifier: str, 121 user_id: str, 122 serivce: str, 123 parent: int, 124 root: int, 125) -> int: 126 db.execute( 127 """ 128 INSERT INTO posts (user_id, service, identifier, parent_id, root_id) 129 VALUES (?, ?, ?, ?, ?); 130 """, 131 (user_id, serivce, identifier, parent, root), 132 ) 133 return db.execute("SELECT last_insert_rowid();", ())[0][0] 134 135 136def insert_mapping(db: DataBaseWorker, original: int, mapped: int): 137 db.execute( 138 """ 139 INSERT INTO mappings (original_post_id, mapped_post_id) 140 VALUES (?, ?); 141 """, 142 (original, mapped), 143 ) 144 145 146def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str): 147 db.execute( 148 """ 149 DELETE FROM posts 150 WHERE identifier = ? 151 AND service = ? 152 AND user_id = ? 153 """, 154 (identifier, serivce, user_id), 155 ) 156 157 158def fetch_data(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict: 159 result = db.execute( 160 """ 161 SELECT extra_data 162 FROM posts 163 WHERE identifier = ? 164 AND user_id = ? 165 AND service = ? 166 """, 167 (identifier, user_id, service), 168 ) 169 if not result or not result[0]: 170 return {} 171 return json.loads(result[0][0]) 172 173 174def store_data( 175 db: DataBaseWorker, identifier: str, user_id: str, service: str, extra_data: dict 176) -> None: 177 db.execute( 178 """ 179 UPDATE posts 180 SET extra_data = ? 181 WHERE identifier = ? 182 AND user_id = ? 183 AND service = ? 184 """, 185 (json.dumps(extra_data), identifier, user_id, service), 186 ) 187 188 189def find_mappings( 190 db: DataBaseWorker, original_post: int, service: str, user_id: str 191) -> list[str]: 192 return db.execute( 193 """ 194 SELECT p.identifier 195 FROM posts AS p 196 JOIN mappings AS m 197 ON p.id = m.mapped_post_id 198 WHERE m.original_post_id = ? 199 AND p.service = ? 200 AND p.user_id = ? 201 ORDER BY p.id; 202 """, 203 (original_post, service, user_id), 204 ) 205 206 207def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None: 208 result = db.execute( 209 """ 210 SELECT user_id, service, identifier, parent_id, root_id, reposted_id 211 FROM posts 212 WHERE id = ? 213 """, 214 (id,), 215 ) 216 if not result: 217 return None 218 user_id, service, identifier, parent_id, root_id, reposted_id = result[0] 219 return { 220 "user_id": user_id, 221 "service": service, 222 "identifier": identifier, 223 "parent_id": parent_id, 224 "root_id": root_id, 225 "reposted_id": reposted_id, 226 } 227 228 229def find_post( 230 db: DataBaseWorker, identifier: str, user_id: str, service: str 231) -> dict | None: 232 result = db.execute( 233 """ 234 SELECT id, parent_id, root_id, reposted_id 235 FROM posts 236 WHERE identifier = ? 237 AND user_id = ? 238 AND service = ? 239 """, 240 (identifier, user_id, service), 241 ) 242 if not result: 243 return None 244 id, parent_id, root_id, reposted_id = result[0] 245 return { 246 "id": id, 247 "parent_id": parent_id, 248 "root_id": root_id, 249 "reposted_id": reposted_id, 250 } 251 252 253def find_mapped_thread( 254 db: DataBaseWorker, 255 parent_id: str, 256 input_user: str, 257 input_service: str, 258 output_user: str, 259 output_service: str, 260): 261 reply_data: dict | None = find_post(db, parent_id, input_user, input_service) 262 if not reply_data: 263 return None 264 265 reply_mappings: list[str] | None = find_mappings( 266 db, reply_data["id"], output_service, output_user 267 ) 268 if not reply_mappings: 269 return None 270 271 reply_identifier: str = reply_mappings[-1] 272 root_identifier: str = reply_mappings[0] 273 if reply_data["root_id"]: 274 root_data = find_post_by_id(db, reply_data["root_id"]) 275 if not root_data: 276 return None 277 278 root_mappings = find_mappings( 279 db, reply_data["root_id"], output_service, output_user 280 ) 281 if not root_mappings: 282 return None 283 root_identifier = root_mappings[0] 284 285 return ( 286 root_identifier[0], # real ids 287 reply_identifier[0], 288 reply_data["root_id"], # db ids 289 reply_data["id"], 290 )