social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import sqlite3
2from concurrent.futures import Future
3import threading
4import queue
5
6class DataBaseWorker():
7 def __init__(self, database: str) -> None:
8 super(DataBaseWorker, self).__init__()
9 self.database = database
10 self.queue = queue.Queue()
11 self.thread = threading.Thread(target=self._run, daemon=True)
12 self.shutdown_event = threading.Event()
13 self.conn = sqlite3.connect(self.database, check_same_thread=False)
14 self.lock = threading.Lock()
15 self.thread.start()
16
17 def _run(self):
18 while not self.shutdown_event.is_set():
19 try:
20 task, future = self.queue.get(timeout=1)
21 try:
22 with self.lock:
23 result = task(self.conn)
24 future.set_result(result)
25 except Exception as e:
26 future.set_exception(e)
27 finally:
28 self.queue.task_done()
29 except queue.Empty:
30 continue
31
32 def execute(self, sql: str, params = ()):
33 def task(conn: sqlite3.Connection):
34 cursor = conn.execute(sql, params)
35 conn.commit()
36 return cursor.fetchall()
37
38 future = Future()
39 self.queue.put((task, future))
40 return future.result()
41
42 def close(self):
43 self.shutdown_event.set()
44 self.thread.join()
45 with self.lock:
46 self.conn.close()
47
48def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
49 db.execute(
50 """
51 INSERT INTO posts (user_id, service, identifier)
52 VALUES (?, ?, ?);
53 """, (user_id, serivce, identifier))
54 return db.execute("SELECT last_insert_rowid();", ())[0][0]
55
56def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int:
57 db.execute(
58 """
59 INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
60 VALUES (?, ?, ?, ?, ?);
61 """, (user_id, serivce, identifier, parent, root))
62 return db.execute("SELECT last_insert_rowid();", ())[0][0]
63
64def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
65 db.execute("""
66 INSERT INTO mappings (original_post_id, mapped_post_id)
67 VALUES (?, ?);
68 """, (original, mapped))
69
70def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
71 db.execute(
72 """
73 DELETE FROM posts
74 WHERE identifier = ?
75 AND service = ?
76 AND user_id = ?
77 """, (identifier, serivce, user_id))
78
79
80def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
81 return db.execute(
82 """
83 SELECT p.identifier
84 FROM posts AS p
85 JOIN mappings AS m
86 ON p.id = m.mapped_post_id
87 WHERE m.original_post_id = ?
88 AND p.service = ?
89 AND p.user_id = ?
90 ORDER BY p.id;
91 """,
92 (original_post, service, user_id))
93
94def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
95 result = db.execute(
96 """
97 SELECT user_id, service, identifier, parent_id, root_id
98 FROM posts
99 WHERE id = ?
100 """, (id,))
101 if not result:
102 return None
103 user_id, service, identifier, parent_id, root_id = result[0]
104 return {
105 'user_id': user_id,
106 'service': service,
107 'identifier': identifier,
108 'parent_id': parent_id,
109 'root_id': root_id
110 }
111
112def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
113 result = db.execute(
114 """
115 SELECT id, parent_id, root_id
116 FROM posts
117 WHERE identifier = ?
118 AND user_id = ?
119 AND service = ?
120 """, (identifier, user_id, service))
121 if not result:
122 return None
123 id, parent_id, root_id = result[0]
124 return {
125 'id': id,
126 'parent_id': parent_id,
127 'root_id': root_id
128 }