social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import sqlite3 2from concurrent.futures import Future 3import threading 4import queue 5import json 6 7class DataBaseWorker(): 8 def __init__(self, database: str) -> None: 9 super(DataBaseWorker, self).__init__() 10 self.database = database 11 self.queue = queue.Queue() 12 self.thread = threading.Thread(target=self._run, daemon=True) 13 self.shutdown_event = threading.Event() 14 self.conn = sqlite3.connect(self.database, check_same_thread=False) 15 self.lock = threading.Lock() 16 self.thread.start() 17 18 def _run(self): 19 while not self.shutdown_event.is_set(): 20 try: 21 task, future = self.queue.get(timeout=1) 22 try: 23 with self.lock: 24 result = task(self.conn) 25 future.set_result(result) 26 except Exception as e: 27 future.set_exception(e) 28 finally: 29 self.queue.task_done() 30 except queue.Empty: 31 continue 32 33 def execute(self, sql: str, params = ()): 34 def task(conn: sqlite3.Connection): 35 cursor = conn.execute(sql, params) 36 conn.commit() 37 return cursor.fetchall() 38 39 future = Future() 40 self.queue.put((task, future)) 41 return future.result() 42 43 def close(self): 44 self.shutdown_event.set() 45 self.thread.join() 46 with self.lock: 47 self.conn.close() 48 49def try_insert_repost( 50 db: DataBaseWorker, 51 post_id: str, 52 reposted_id: str, 53 input_user: str, 54 input_service: str) -> bool: 55 56 reposted = find_post(db, reposted_id, input_user, input_service) 57 if not reposted: 58 return False 59 60 insert_repost(db, post_id, reposted['id'], input_user, input_service) 61 return True 62 63 64def try_insert_post( 65 db: DataBaseWorker, 66 post_id: str, 67 in_reply: str | None, 68 input_user: str, 69 input_service: str) -> bool: 70 root_id = None 71 parent_id = None 72 73 if in_reply: 74 parent_post = find_post(db, in_reply, input_user, input_service) 75 if not parent_post: 76 return False 77 78 root_id = parent_post['id'] 79 parent_id = root_id 80 if parent_post['root_id']: 81 root_id = parent_post['root_id'] 82 83 if root_id and parent_id: 84 insert_reply(db,post_id, input_user, input_service, parent_id, root_id) 85 else: 86 insert_post(db, post_id, input_user, input_service) 87 88 return True 89 90def insert_repost(db: DataBaseWorker, identifier: str, reposted_id: int, user_id: str, serivce: str) -> int: 91 db.execute( 92 """ 93 INSERT INTO posts (user_id, service, identifier, reposted_id) 94 VALUES (?, ?, ?, ?); 95 """, (user_id, serivce, identifier, reposted_id)) 96 return db.execute("SELECT last_insert_rowid();", ())[0][0] 97 98def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int: 99 db.execute( 100 """ 101 INSERT INTO posts (user_id, service, identifier) 102 VALUES (?, ?, ?); 103 """, (user_id, serivce, identifier)) 104 return db.execute("SELECT last_insert_rowid();", ())[0][0] 105 106def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int: 107 db.execute( 108 """ 109 INSERT INTO posts (user_id, service, identifier, parent_id, root_id) 110 VALUES (?, ?, ?, ?, ?); 111 """, (user_id, serivce, identifier, parent, root)) 112 return db.execute("SELECT last_insert_rowid();", ())[0][0] 113 114def insert_mapping(db: DataBaseWorker, original: int, mapped: int): 115 db.execute(""" 116 INSERT INTO mappings (original_post_id, mapped_post_id) 117 VALUES (?, ?); 118 """, (original, mapped)) 119 120def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str): 121 db.execute( 122 """ 123 DELETE FROM posts 124 WHERE identifier = ? 125 AND service = ? 126 AND user_id = ? 127 """, (identifier, serivce, user_id)) 128 129def fetch_data(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict: 130 result = db.execute( 131 """ 132 SELECT extra_data 133 FROM posts 134 WHERE identifier = ? 135 AND user_id = ? 136 AND service = ? 137 """, (identifier, user_id, service)) 138 if not result or not result[0]: 139 return {} 140 return json.loads(result[0][0]) 141 142def store_data(db: DataBaseWorker, identifier: str, user_id: str, service: str, extra_data: dict) -> None: 143 db.execute( 144 """ 145 UPDATE posts 146 SET extra_data = ? 147 WHERE identifier = ? 148 AND user_id = ? 149 AND service = ? 150 """, 151 (json.dumps(extra_data), identifier, user_id, service) 152 ) 153 154def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]: 155 return db.execute( 156 """ 157 SELECT p.identifier 158 FROM posts AS p 159 JOIN mappings AS m 160 ON p.id = m.mapped_post_id 161 WHERE m.original_post_id = ? 162 AND p.service = ? 163 AND p.user_id = ? 164 ORDER BY p.id; 165 """, 166 (original_post, service, user_id)) 167 168def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None: 169 result = db.execute( 170 """ 171 SELECT user_id, service, identifier, parent_id, root_id, reposted_id 172 FROM posts 173 WHERE id = ? 174 """, (id,)) 175 if not result: 176 return None 177 user_id, service, identifier, parent_id, root_id, reposted_id = result[0] 178 return { 179 'user_id': user_id, 180 'service': service, 181 'identifier': identifier, 182 'parent_id': parent_id, 183 'root_id': root_id, 184 'reposted_id': reposted_id 185 } 186 187def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None: 188 result = db.execute( 189 """ 190 SELECT id, parent_id, root_id, reposted_id 191 FROM posts 192 WHERE identifier = ? 193 AND user_id = ? 194 AND service = ? 195 """, (identifier, user_id, service)) 196 if not result: 197 return None 198 id, parent_id, root_id, reposted_id = result[0] 199 return { 200 'id': id, 201 'parent_id': parent_id, 202 'root_id': root_id, 203 'reposted_id': reposted_id 204 } 205 206def find_mapped_thread( 207 db: DataBaseWorker, 208 parent_id: str, 209 input_user: str, 210 input_service: str, 211 output_user: str, 212 output_service: str): 213 214 reply_data: dict | None = find_post(db, parent_id, input_user, input_service) 215 if not reply_data: 216 return None 217 218 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user) 219 if not reply_mappings: 220 return None 221 222 reply_identifier: str = reply_mappings[-1] 223 root_identifier: str = reply_mappings[0] 224 if reply_data['root_id']: 225 root_data = find_post_by_id(db, reply_data['root_id']) 226 if not root_data: 227 return None 228 229 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user) 230 if not root_mappings: 231 return None 232 root_identifier = root_mappings[0] 233 234 return ( 235 root_identifier[0], # real ids 236 reply_identifier[0], 237 reply_data['root_id'], # db ids 238 reply_data['id'] 239 )