social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import sqlite3 2from concurrent.futures import Future 3import threading 4import queue 5 6class DataBaseWorker(): 7 def __init__(self, database: str) -> None: 8 super(DataBaseWorker, self).__init__() 9 self.database = database 10 self.queue = queue.Queue() 11 self.thread = threading.Thread(target=self._run, daemon=True) 12 self.shutdown_event = threading.Event() 13 self.conn = sqlite3.connect(self.database, check_same_thread=False) 14 self.lock = threading.Lock() 15 self.thread.start() 16 17 def _run(self): 18 while not self.shutdown_event.is_set(): 19 try: 20 task, future = self.queue.get(timeout=1) 21 try: 22 with self.lock: 23 result = task(self.conn) 24 future.set_result(result) 25 except Exception as e: 26 future.set_exception(e) 27 finally: 28 self.queue.task_done() 29 except queue.Empty: 30 continue 31 32 def execute(self, sql: str, params = ()): 33 def task(conn: sqlite3.Connection): 34 cursor = conn.execute(sql, params) 35 conn.commit() 36 return cursor.fetchall() 37 38 future = Future() 39 self.queue.put((task, future)) 40 return future.result() 41 42 def close(self): 43 self.shutdown_event.set() 44 self.thread.join() 45 with self.lock: 46 self.conn.close() 47 48def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int: 49 db.execute( 50 """ 51 INSERT INTO posts (user_id, service, identifier) 52 VALUES (?, ?, ?); 53 """, (user_id, serivce, identifier)) 54 return db.execute("SELECT last_insert_rowid();", ())[0][0] 55 56def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int: 57 db.execute( 58 """ 59 INSERT INTO posts (user_id, service, identifier, parent_id, root_id) 60 VALUES (?, ?, ?, ?, ?); 61 """, (user_id, serivce, identifier, parent, root)) 62 return db.execute("SELECT last_insert_rowid();", ())[0][0] 63 64def insert_mapping(db: DataBaseWorker, original: int, mapped: int): 65 db.execute(""" 66 INSERT INTO mappings (original_post_id, mapped_post_id) 67 VALUES (?, ?); 68 """, (original, mapped)) 69 70def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str): 71 db.execute( 72 """ 73 DELETE FROM posts 74 WHERE identifier = ? 75 AND service = ? 76 AND user_id = ? 77 """, (identifier, serivce, user_id)) 78 79 80def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]: 81 return db.execute( 82 """ 83 SELECT p.identifier 84 FROM posts AS p 85 JOIN mappings AS m 86 ON p.id = m.mapped_post_id 87 WHERE m.original_post_id = ? 88 AND p.service = ? 89 AND p.user_id = ? 90 ORDER BY p.id; 91 """, 92 (original_post, service, user_id)) 93 94def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None: 95 result = db.execute( 96 """ 97 SELECT user_id, service, identifier, parent_id, root_id 98 FROM posts 99 WHERE id = ? 100 """, (id,)) 101 if not result: 102 return None 103 user_id, service, identifier, parent_id, root_id = result[0] 104 return { 105 'user_id': user_id, 106 'service': service, 107 'identifier': identifier, 108 'parent_id': parent_id, 109 'root_id': root_id 110 } 111 112def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None: 113 result = db.execute( 114 """ 115 SELECT id, parent_id, root_id 116 FROM posts 117 WHERE identifier = ? 118 AND user_id = ? 119 AND service = ? 120 """, (identifier, user_id, service)) 121 if not result: 122 return None 123 id, parent_id, root_id = result[0] 124 return { 125 'id': id, 126 'parent_id': parent_id, 127 'root_id': root_id 128 }