social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import sqlite3 2from concurrent.futures import Future 3import threading 4import queue 5 6class DataBaseWorker(): 7 def __init__(self, database: str) -> None: 8 super(DataBaseWorker, self).__init__() 9 self.database = database 10 self.queue = queue.Queue() 11 self.thread = threading.Thread(target=self._run, daemon=True) 12 self.shutdown_event = threading.Event() 13 self.conn = sqlite3.connect(self.database, check_same_thread=False) 14 self.lock = threading.Lock() 15 self.thread.start() 16 17 def _run(self): 18 while not self.shutdown_event.is_set(): 19 try: 20 task, future = self.queue.get(timeout=1) 21 try: 22 with self.lock: 23 result = task(self.conn) 24 future.set_result(result) 25 except Exception as e: 26 future.set_exception(e) 27 finally: 28 self.queue.task_done() 29 except queue.Empty: 30 continue 31 32 def execute(self, sql: str, params = ()): 33 def task(conn: sqlite3.Connection): 34 cursor = conn.execute(sql, params) 35 conn.commit() 36 return cursor.fetchall() 37 38 future = Future() 39 self.queue.put((task, future)) 40 return future.result() 41 42 def close(self): 43 self.shutdown_event.set() 44 self.thread.join() 45 with self.lock: 46 self.conn.close() 47 48def try_insert_post( 49 db: DataBaseWorker, 50 post_id: str, 51 in_reply: str | None, 52 input_user: str, 53 input_service: str) -> bool: 54 root_id = None 55 parent_id = None 56 57 if in_reply: 58 parent_post = find_post(db, in_reply, input_user, input_service) 59 if not parent_post: 60 return False 61 62 root_id = parent_post['id'] 63 parent_id = root_id 64 if parent_post['root_id']: 65 root_id = parent_post['root_id'] 66 67 if root_id and parent_id: 68 insert_reply(db,post_id, input_user, input_service, parent_id, root_id) 69 else: 70 insert_post(db, post_id, input_user, input_service) 71 72 return True 73 74 75def find_mapped_thread( 76 db: DataBaseWorker, 77 parent_id: str, 78 input_user: str, 79 input_service: str, 80 output_user: str, 81 output_service: str): 82 83 reply_data: dict | None = find_post(db, parent_id, input_user, input_service) 84 if not reply_data: 85 return None 86 87 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user) 88 if not reply_mappings: 89 return None 90 91 reply_identifier: str = reply_mappings[-1] 92 root_identifier: str = reply_mappings[0] 93 if reply_data['root_id']: 94 root_data = find_post_by_id(db, reply_data['root_id']) 95 if not root_data: 96 return None 97 98 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user) 99 if not root_mappings: 100 return None 101 root_identifier = root_mappings[0] 102 103 return ( 104 root_identifier[0], # real ids 105 reply_identifier[0], 106 reply_data['root_id'], # db ids 107 reply_data['id'] 108 ) 109 110 111def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int: 112 db.execute( 113 """ 114 INSERT INTO posts (user_id, service, identifier) 115 VALUES (?, ?, ?); 116 """, (user_id, serivce, identifier)) 117 return db.execute("SELECT last_insert_rowid();", ())[0][0] 118 119def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int: 120 db.execute( 121 """ 122 INSERT INTO posts (user_id, service, identifier, parent_id, root_id) 123 VALUES (?, ?, ?, ?, ?); 124 """, (user_id, serivce, identifier, parent, root)) 125 return db.execute("SELECT last_insert_rowid();", ())[0][0] 126 127def insert_mapping(db: DataBaseWorker, original: int, mapped: int): 128 db.execute(""" 129 INSERT INTO mappings (original_post_id, mapped_post_id) 130 VALUES (?, ?); 131 """, (original, mapped)) 132 133def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str): 134 db.execute( 135 """ 136 DELETE FROM posts 137 WHERE identifier = ? 138 AND service = ? 139 AND user_id = ? 140 """, (identifier, serivce, user_id)) 141 142 143def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]: 144 return db.execute( 145 """ 146 SELECT p.identifier 147 FROM posts AS p 148 JOIN mappings AS m 149 ON p.id = m.mapped_post_id 150 WHERE m.original_post_id = ? 151 AND p.service = ? 152 AND p.user_id = ? 153 ORDER BY p.id; 154 """, 155 (original_post, service, user_id)) 156 157def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None: 158 result = db.execute( 159 """ 160 SELECT user_id, service, identifier, parent_id, root_id 161 FROM posts 162 WHERE id = ? 163 """, (id,)) 164 if not result: 165 return None 166 user_id, service, identifier, parent_id, root_id = result[0] 167 return { 168 'user_id': user_id, 169 'service': service, 170 'identifier': identifier, 171 'parent_id': parent_id, 172 'root_id': root_id 173 } 174 175def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None: 176 result = db.execute( 177 """ 178 SELECT id, parent_id, root_id 179 FROM posts 180 WHERE identifier = ? 181 AND user_id = ? 182 AND service = ? 183 """, (identifier, user_id, service)) 184 if not result: 185 return None 186 id, parent_id, root_id = result[0] 187 return { 188 'id': id, 189 'parent_id': parent_id, 190 'root_id': root_id 191 }