social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import sqlite3 2from concurrent.futures import Future 3import threading 4import queue 5 6class DataBaseWorker(): 7 def __init__(self, database: str) -> None: 8 super(DataBaseWorker, self).__init__() 9 self.database = database 10 self.queue = queue.Queue() 11 self.thread = threading.Thread(target=self._run, daemon=True) 12 self.shutdown_event = threading.Event() 13 self.conn = sqlite3.connect(self.database, check_same_thread=False) 14 self.lock = threading.Lock() 15 self.thread.start() 16 17 def _run(self): 18 while not self.shutdown_event.is_set(): 19 try: 20 task, future = self.queue.get(timeout=1) 21 try: 22 with self.lock: 23 result = task(self.conn) 24 future.set_result(result) 25 except Exception as e: 26 future.set_exception(e) 27 finally: 28 self.queue.task_done() 29 except queue.Empty: 30 continue 31 32 def execute(self, sql: str, params = ()): 33 def task(conn: sqlite3.Connection): 34 cursor = conn.execute(sql, params) 35 conn.commit() 36 return cursor.fetchall() 37 38 future = Future() 39 self.queue.put((task, future)) 40 return future.result() 41 42 def close(self): 43 self.shutdown_event.set() 44 self.thread.join() 45 with self.lock: 46 self.conn.close() 47 48def find_mapped_thread( 49 db: DataBaseWorker, 50 parent_id: str, 51 input_user: str, 52 input_service: str, 53 output_user: str, 54 output_service: str): 55 56 reply_data: dict | None = find_post(db, parent_id, input_user, input_service) 57 if not reply_data: 58 return None 59 60 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user) 61 if not reply_mappings: 62 return None 63 64 reply_identifier: str = reply_mappings[-1] 65 root_identifier: str = reply_mappings[0] 66 if reply_data['root_id']: 67 root_data = find_post_by_id(db, reply_data['root_id']) 68 if not root_data: 69 return None 70 71 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user) 72 if not root_mappings: 73 return None 74 root_identifier = root_mappings[0] 75 76 return ( 77 root_identifier[0], # real ids 78 reply_identifier[0], 79 reply_data['root_id'], # db ids 80 reply_data['id'] 81 ) 82 83 84def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int: 85 db.execute( 86 """ 87 INSERT INTO posts (user_id, service, identifier) 88 VALUES (?, ?, ?); 89 """, (user_id, serivce, identifier)) 90 return db.execute("SELECT last_insert_rowid();", ())[0][0] 91 92def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int: 93 db.execute( 94 """ 95 INSERT INTO posts (user_id, service, identifier, parent_id, root_id) 96 VALUES (?, ?, ?, ?, ?); 97 """, (user_id, serivce, identifier, parent, root)) 98 return db.execute("SELECT last_insert_rowid();", ())[0][0] 99 100def insert_mapping(db: DataBaseWorker, original: int, mapped: int): 101 db.execute(""" 102 INSERT INTO mappings (original_post_id, mapped_post_id) 103 VALUES (?, ?); 104 """, (original, mapped)) 105 106def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str): 107 db.execute( 108 """ 109 DELETE FROM posts 110 WHERE identifier = ? 111 AND service = ? 112 AND user_id = ? 113 """, (identifier, serivce, user_id)) 114 115 116def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]: 117 return db.execute( 118 """ 119 SELECT p.identifier 120 FROM posts AS p 121 JOIN mappings AS m 122 ON p.id = m.mapped_post_id 123 WHERE m.original_post_id = ? 124 AND p.service = ? 125 AND p.user_id = ? 126 ORDER BY p.id; 127 """, 128 (original_post, service, user_id)) 129 130def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None: 131 result = db.execute( 132 """ 133 SELECT user_id, service, identifier, parent_id, root_id 134 FROM posts 135 WHERE id = ? 136 """, (id,)) 137 if not result: 138 return None 139 user_id, service, identifier, parent_id, root_id = result[0] 140 return { 141 'user_id': user_id, 142 'service': service, 143 'identifier': identifier, 144 'parent_id': parent_id, 145 'root_id': root_id 146 } 147 148def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None: 149 result = db.execute( 150 """ 151 SELECT id, parent_id, root_id 152 FROM posts 153 WHERE identifier = ? 154 AND user_id = ? 155 AND service = ? 156 """, (identifier, user_id, service)) 157 if not result: 158 return None 159 id, parent_id, root_id = result[0] 160 return { 161 'id': id, 162 'parent_id': parent_id, 163 'root_id': root_id 164 }